25 Spark 3.0 Feature Ii Dpp How to Use the Dpp Feature

25 Spark 3 #

Hello, I’m Wu Lei.

DPP (Dynamic Partition Pruning) is the second noteworthy feature in Spark 3.0. It refers to the efficient utilization of filtered dimension tables in a star schema data association scenario, greatly reducing the amount of data scanned in fact tables, and overall improving the performance of association calculations.

In today’s lecture, we will use an example in an e-commerce scenario to explain what partition pruning is, what dynamic partition pruning is, its purpose, usage, and considerations. This will help you learn how to effectively use DPP in one go.

Partition Pruning #

Let’s start with an example. In a star schema data warehouse, we have two tables: the orders table and the users table. It is evident that the orders table is the fact table, while the users table is the dimension table. The business requirement is to calculate the total revenue contributed by the top users and sort it in descending order. How can we achieve this requirement?

First, let’s understand the key fields of the two tables to determine how the query should be written.

// Key fields in the orders table
userId, Int
itemId, Int
price, Float
quantity, Int
 
// Key fields in the users table
id, Int
name, String
type, String // Enum value, divided into top users and tail users

Given the above tables, we only need to perform an inner join between the two tables, and then perform grouping, aggregation, and sorting to achieve the business logic. The specific query is as follows.

select (orders.price * order.quantity) as income, users.name
from orders inner join users on orders.userId = users.id
where users.type = 'Head User'
group by users.name
order by income desc

With this query, and combining our knowledge from the Spark SQL lessons, we can quickly draw its logical execution plan.

Since the fact table in the query does not have any filtering conditions, Spark SQL chooses to perform a full table scan on the left side of the execution plan to project the fields userId, price, and quantity. Conversely, the dimension table has a filtering condition users.type = ‘Head User’, so Spark SQL can apply the predicate pushdown rule to push down the filtering operation onto the data source and reduce the required disk I/O overhead.

Although predicate pushdown is already helpful, the efficiency of I/O can be further improved if the dimension table supports partition pruning. So, what is partition pruning? In fact, partition pruning is a special case of predicate pushdown. It refers to pushing down predicates in a partitioned table and filtering the dataset on a per-directory basis in the file system. A partitioned table is a table created by specifying a partition key and using the partitioned by statement, or by using the partitionBy statement to store columnar files (such as Parquet, ORC, etc.).

Compared to regular tables, partitioned tables have a special way of storing data. For each data value in the partition key, the partitioned table creates a separate subdirectory in the file system to store the corresponding data shard. Taking the users table as an example, assuming it is a partitioned table with the type field as the partition key, the users table will have two subdirectories with prefixes “Head User” and “Tail User”. The data records are stored in the subdirectory depending on the value of the type field. For example, all data records with a type field value of “Head User” are stored in the subdirectory with the prefix “Head User”. Similarly, all data records with a type field value of “Tail User” are stored in the subdirectory with the prefix “Tail User”.

It is not difficult to see that if the filtering predicate contains the partition key, Spark SQL can skip (prune) the directory that does not satisfy the predicate condition when scanning a partitioned table. For example, in our query statement, the filtering predicate for the users table is “users.type = ‘Head User’”. If the users table is a partitioned table, Spark SQL can completely skip the subdirectory with the prefix “Tail User” during the data scan of the users table.

By comparing it with predicate pushdown, we can intuitively feel the power of partition pruning. As shown in the figure, the two rows represent the data scan of the users table by Spark SQL with and without partitioning, respectively. Without partitioning, all data shards of the users table are stored in the same file system directory. Although the Parquet format provides the statistics of the type field in the footer, Spark SQL can use predicate pushdown to reduce the number of data shards that need to be scanned. However, since many data shards in the footer of the type field in the first row contain both ‘Head User’ and ‘Tail User’ (the three light green data shards in the first row), the data scan of the users table still involves four data shards.

In contrast, when the users table itself is a partitioned table, since the data records with the type field value of ‘Head User’ are all stored in the subdirectory with the prefix ‘Head User’, which is the light green file system directory in the second row of the figure, this directory contains only two data shards with the type field value of ‘Head User’. As a result, Spark SQL can completely skip the scan of other subdirectories, thereby significantly improving the I/O efficiency.

You may say, “Since partition pruning is so powerful, can’t we apply it to the fact table as well? After all, the fact table has a larger volume, and there is more room for improvement in I/O efficiency compared to the dimension table.” That’s right, if the fact table itself is a partitioned table, and the filtering predicate contains the partition key, then Spark SQL will also use the partition pruning feature to greatly reduce the amount of data to be scanned.

However, for the majority of real-world join queries, the fact tables do not meet the prerequisite conditions for partition pruning. For example, either the fact table is not a partitioned table, or there is no filtering predicate on the fact table, or the filtering predicate does not contain the partition key. Take the e-commerce scenario as an example, the query doesn’t even have any filtering predicates related to the orders table. Therefore, even if the orders table itself is a partitioned table, Spark SQL cannot utilize the partition pruning feature.

For such join queries, should we just let Spark SQL perform a full scan on the fact table? In the past, there was not much we could do. However, with the introduction of the Dynamic Partition Pruning (DPP) feature in Spark 3.0, the situation has changed.

Dynamic Partition Pruning #

As we mentioned earlier, DPP refers to the use of filtering information provided by dimension tables in Spark SQL to reduce the amount of data scanned in fact tables, lower I/O overhead, and improve execution performance. So how does DPP achieve this? What is the logic behind it? To help you understand, let’s use the previous example to explain.

First, the filter condition users.type = 'Head User' will help the dimension table filter out some data. At the same time, the ID field in the dimension table will also be filtered, as shown in step 1 in the diagram. After this filtering, the retained ID values are just a subset of the full set of IDs in the dimension table.

Then, under the join relationship orders.userId = users.id, the filtering effect will propagate from the ID field of the dimension table to the userId field of the fact table, as shown in step 2 in the diagram. As a result, the userId values that satisfy the join relationship are also a subset of the full set of userId in the fact table. By using the satisfied userId as a filter condition and applying it to the data source of the fact table, the data scanned can be reduced, and I/O efficiency can be improved, as shown in step 3 in the diagram.

DPP is based on the above logic to propagate the filtering conditions on the dimension table to the fact table, thereby optimizing the fact table. Although the operation logic of DPP is very clear, not all data association scenarios can enjoy the optimization mechanism of DPP. To use DPP to accelerate the reading and accessing of fact table data, the data association scenario needs to meet three additional conditions.

First, DPP is a partition pruning mechanism, it filters the fact table on a per-partition basis. Combining with the previous logic, the filter conditions on the dimension table will be converted into filter conditions on the Join Key of the fact table. Specifically, in our example, it is the field orders.userId. Obviously, the prerequisite for DPP to take effect is that the fact table has been pre-partitioned based on the column orders.userId. Therefore, the fact table must be a partitioned table, and the partition field(s) (can be multiple) must include the Join Key.

Second, the propagation of filtering effect relies on equal joins, such as orders.userId = users.id. Therefore, DPP only supports equal joins and does not support unequal relations such as greater than or less than.

In addition, the implementation of the DPP mechanism also has an implicit condition: the data set after filtering on the dimension table must be smaller than the broadcast threshold.

Taking the dimension table users as an example, the data set that satisfies the filter condition users.type = 'Head User' needs to be able to fit into a broadcast variable in order for the DPP optimization mechanism to take effect. Why is that? This brings us to the implementation principle of the DPP mechanism.

Combining with the analysis and deduction of the implementation logic of DPP just now, we can easily find that the key to implementing the DPP mechanism is to allow the computational branch that handles the fact table to obtain the list of Join Keys that satisfy the filtering conditions, and then use this list to prune the fact table partitions. So how can we obtain this list?

Spark SQL chooses a “kill two birds with one stone” approach: using a broadcast variable to encapsulate the dimension table data after filtering. Specifically, after the dimension table is filtered, Spark SQL constructs a hash table on it, and the Key of this hash table is the Join Key used for association. In our example, the Key is the users.id that satisfies the condition users.type = 'Head User'; the Value is the data columns that need to be referenced in the projection. In the previous query of the order table and the user table, the referenced column here is users.name.

After the hash table is constructed, Spark SQL encapsulates it into a broadcast variable. This broadcast variable has two roles. The first role is to be used by the fact table for partition pruning, as shown in step 1 in the diagram. The Key Set in the hash table can be used to filter the partitions of the fact table that meet the conditions.

The second role is to participate in the subsequent Broadcast Join for data association, as shown in step 2 in the diagram. The hash table here is essentially the Build Table in Hash Join, and its Key and Value record all the fields required for data association, such as users.id, users.name, which can be used for Broadcast Hash Join with the fact table.

Therefore, as you can see, since Spark SQL has chosen the implementation method of broadcast variables, in order to effectively use the DPP optimization mechanism, we must ensure that the dimension table after filtering can fit into the broadcast variable. Therefore, we must be cautious about the configuration parameter spark.sql.autoBroadcastJoinThreshold.

Summary #

In this lecture, we focused on dynamic partition pruning and learned about the connection and difference between predicate pushdown and partition pruning, as well as the definition, characteristics, and usage of dynamic partition pruning.

Compared to predicate pushdown, partition pruning often provides better I/O efficiency for disk access.

This is because predicate pushdown is often done based on the statistical information in file footers to filter files, and the filtering effect depends on the “purity” of the file content. However, partition pruning is different. Its partition table can isolate files with different content into different directories of the file system. In this way, the filtering conditions including the partition keys can filter disk files on the granularity of file system directories, greatly improving the I/O efficiency of disk access.

Dynamic partition pruning is mainly used in data association scenarios of star model data warehouses. It means that during runtime, Spark SQL reduces the scan amount of data in the fact table and reduces I/O overhead by using the filtering information provided by dimension tables, thereby improving execution performance.

The logic behind the operation of dynamic partition pruning is to transmit the filtering conditions in the dimension table to the fact table through the association relationship to optimize the fact table. In data association scenarios, developers need to pay attention to three points in order to make good use of the dynamic partition pruning feature:

  • The fact table must be a partitioned table, and the partition key must include the join key.
  • Dynamic partition pruning only supports equi-joins and does not support non-equivalent relationships such as greater than or less than.
  • The data set after dimension table filtering must be smaller than the broadcast threshold, so developers need to adjust the configuration item spark.sql.autoBroadcastJoinThreshold.

Daily Practice #

  1. If you were asked to rewrite the DPP (Distributed Parallel Processing) implementation mechanism, would you consider removing the restriction on the broadcast threshold? (Hint: Abandon the use of Broadcast Hash Join for association, but still use broadcast variables for partition pruning.)
  2. In order to obtain a list of Join Keys that satisfy the conditions for the fact table, besides using broadcast variables, do you think there are any other methods or approaches?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next class!