23 Tungsten Execution Plan What Blessings Does Tungsten Bring to Developers

23 Tungsten Execution Plan - What Blessings Does Tungsten Bring to Developers #

Hello, I am Wu Lei.

In the previous two lessons, we learned that in the intelligent brain of Spark SQL, the “left brain” Catalyst optimizer is responsible for transforming the query statement into an executable Physical Plan. However, directly handing over the Physical Plan to Spark for execution is not the optimal choice. The best choice is to hand it over to the “right brain” Tungsten for further optimization.

Tungsten, also known as the Tungsten Project, focuses on two improvements in the core engine: data structure design and Whole Stage Code Generation (WSCG).

In today’s lesson, let’s talk about the design intention of Tungsten, the problems it solves with these two improvements, and the performance benefits it brings to developers.

Design of Tungsten in terms of data structures #

Compared to Spark Core, Tungsten has made two significant improvements in data structures: compact binary format Unsafe Row and memory page management. Let’s discuss them one by one.

Unsafe Row: Binary Data Structure #

Unsafe Row is a byte array that can be used to store user data entries with the schema shown in the diagram below (userID, name, age, gender). In general, all fields are placed in the array according to the order specified in the schema. For fixed-length fields, the values are directly inserted into the bytes, while for variable-length fields, the corresponding offset address is inserted at the respective position in the schema, and then the field length and value are stored in later elements. We discussed a more detailed example in Lesson 9, you can take a look.

So, what are the advantages of this storage format? Let’s think about this question from the opposite perspective: what would happen if we stored data entries with the same schema using the traditional JVM object approach?

JVM would need at least 6 objects to store a single user data entry. Among them, GenericMutableRow is used to encapsulate the data, and Array is used to store the actual data values. Each element in the Array is an object, such as BoxedInteger for integers, String for strings, and so on. This storage approach has two obvious drawbacks.

First, it incurs high storage overhead. Let’s take the name field with the type String as an example. If a user’s name is “Mike”, it should only take up 4 bytes. But in JVM’s object storage, “Mike” would consume a total of 48 bytes, which includes 12 bytes for object headers, 8 bytes for hash codes, 8 bytes for field values, and an additional 20 bytes for other overhead. From 4 bytes to 48 bytes, the storage overhead is evident.

Second, the more objects there are in JVM’s heap memory, the lower the efficiency of garbage collection (GC). Therefore, encapsulating a data record with a single object is the best approach. However, as we can see from the diagram below, JVM needs at least 6 objects to store a data record. If you have trillion samples, it means JVM needs to manage sextillions of objects, which significantly increases the GC pressure.

Now let’s look at UnsafeRow again. By using the byte array storage format, UnsafeRow eliminates storage overhead and can encapsulate a data record with just a single array object, significantly reducing GC pressure. It can be said that UnsafeRow brings considerable potential performance benefits. However, Tungsten doesn’t stop there. In order to unify the management of off-heap and on-heap memory, and further improve data storage efficiency and GC efficiency, Tungsten also introduces memory page-based memory management mode.

Memory Page-Based Memory Management #

In order to unify the management of off-heap and on-heap memory spaces, Tungsten defines a unified 128-bit memory address called the Tungsten address. The Tungsten address is divided into two parts: the first 64 bits are reserved for Java objects, and the last 64 bits are the offset address. However, even though both off-heap and on-heap memory spaces use 128-bit Tungsten addresses, their addressing methods are completely different.

For the on-heap Tungsten addresses, the first 64 bits contain references or pointers to JVM heap objects, and the last 64 bits (offset) store the offset address of the data within that object. On the other hand, off-heap memory space is completely different. In the off-heap space, since Spark directly manages operating system memory using the Java Unsafe API and there are no memory objects, the first 64 bits store a null value, and the last 64 bits are used to directly address the memory space of the operating system in the off-heap space.

Clearly, managing on-heap memory in the Tungsten mode is more complex than off-heap memory. This is because addressing on-heap data requires two steps: first, locating the JVM object through the 64-bit object reference, and then, combining the offset provided to find the desired data in the on-heap memory space. The relationship between the JVM object address and the offset address is similar to the relationship between the starting address of an array and the offset address of array elements. Once the starting address and the offset address are given, the system can quickly locate the data element. Therefore, in the two steps mentioned earlier, locating the JVM object through the Object reference is crucial. Next, we will focus on explaining this process.

As shown in the above figure, Tungsten uses a data structure called the page table to record the mapping from the Object reference to the JVM object address. The page table records one memory page after another, and the memory page is essentially a JVM object. As long as a 64-bit Object reference is given, Tungsten can easily obtain the JVM object address through the page table and complete the addressing.

So, what are the benefits of using this approach in memory management for Tungsten? Let’s take the commonly used HashMap data structure as an example to compare the HashMap implementation in the Java standard library (java.util.HashMap) and the HashMap in the Tungsten mode.

The Java standard library uses an array plus linked list approach to implement HashMap. As shown in the above figure, array elements store the hash code and the head of the linked list. Each linked list node stores three elements: the Key reference, the Value reference, and the address of the next element. Generally, if an interviewer asks you to implement a HashMap, we often adopt this implementation approach.

However, this implementation approach has two disadvantages.

First, it has high storage overhead and GC burden. Combining the above diagram, we can see that the stored data values only occupy half of the entire HashMap storage space, and the other half is used to store references and pointers. This 50% storage overhead is quite significant. And we can see that each Key, Value, and linked list element in the diagram is a JVM object. Suppose we use HashMap to store one million data entries, then the number of JVM objects is at least three million. Since the efficiency of JVM’s garbage collection is inversely proportional to the number of objects, the implementation approach of java.util.HashMap is not friendly to GC.

Second, during the process of accessing data, the HashMap implemented in the standard library is prone to reduce CPU cache hit rate, thus reducing CPU utilization. The characteristic of the linked list data structure is that it is friendly to write operations but inefficient for access operations. Storing data in a linked list manner is indeed flexible, allowing the JVM to fully utilize scattered memory regions and improve memory utilization. However, when performing a full scan on the linked list, this scattered storage method introduces a large number of random memory accesses. Compared with sequential access, random memory access significantly reduces the CPU cache hit rate.

So, how does Tungsten solve the above issues? Let’s look at storage overhead, GC efficiency, and CPU cache hit rate respectively.

First, Tungsten abandoned the linked list implementation approach and adopted the array plus memory page approach to implement HashMap. The elements stored in the array are the hash code and the Tungsten memory address, which is the Object reference plus the 128-bit address offset. Tungsten HashMap uses a 128-bit address to address data elements, which reduces the storage overhead compared to the large number of linked list pointers in java.util.HashMap.

Second, the storage unit of Tungsten HashMap is the memory page, and the memory page is essentially a Java Object. One memory page can store multiple data entries. Therefore, compared to the HashMap in the standard library, using memory pages significantly reduces the number of objects required for storage. For example, if we need to store one million data entries, the standard library’s HashMap requires at least three million JVM objects to accommodate them, while Tungsten HashMap may only need a few or a dozen memory pages. The difference in the number of JVM objects they require is significant, clearly indicating that Tungsten’s implementation approach is more friendly to GC.

Moreover, the memory page is essentially a JVM object, and it uses contiguous space to store data. With the memory page plus offset, we can accurately locate each data element. Therefore, when a full scan of HashMap data is required, due to the contiguous storage in memory pages, the memory access mode changes from random access to sequential access. Sequential memory access significantly improves CPU cache utilization, reduces CPU interruptions, and significantly improves CPU utilization.

How to understand WSCG? #

Next, let’s talk about WSCG. First of all, what is WSCG? This is where the second layer of memory computation comes into play. It refers to combining the compute functions of multiple RDDs into one within the same stage, and then applying this function to the input data all at once. However, this combination is done using nested iterators. For example, in the processing of Stage0 in Potato Workshop, which is the fuse function in the diagram below. It is simply a nesting of the clean, slice, and bake functions, and they are not truly fused into one function.

WSCG refers to generating a “handwritten code” based on the calling relationship between operators in the same stage, truly combining all computations into a unified function.

What is the Volcano Iteration Model? #

So, is it really necessary to combine three function bodies into one function, or even generate a “handwritten code”? Isn’t the nested iterator function call sufficient? To be honest, the nested iterator is not enough. The reason is that the computation mode of nested iterators involves two operations, one is the random access of memory data, and the other is the virtual function call (next). Both of these operations will reduce the CPU’s cache hit rate and affect the CPU’s efficiency. This may sound abstract, so let’s look at a simple example.

Let’s say we have a table of citizens and we want to count the number of people in Beijing. The corresponding syntax tree is very simple, from left to right, it includes data scanning, filtering, projection, and aggregation. The syntax tree is first transformed into a Physical Plan by the “left brain” Catalyst optimizer, and then handed over for execution. Prior to the appearance of Tungsten, Spark used the Volcano Iteration Model (VI Model) to execute computations at runtime. Here, let’s briefly introduce the Volcano Iteration Model (VI Model).

This calculation mode of the VI Model is based on the AST syntax tree and provides a unified encapsulation for the calculation of all operators (such as filtering, projection). All operators need to implement the iterator abstraction of the VI Model. In simple terms, all operators need to implement the hasNext and next methods. Therefore, the VI Model is very flexible and has strong extensibility. Any operator that implements the iterator abstraction can be added to the syntax tree to participate in the computation. In addition, in order to facilitate data exchange between operators, the VI Model also provides a unified encapsulation for the output of all operators.

So, if the above query uses the VI Model to execute computations, what steps does it need to go through? For each data entry in the data source, each operator in the syntax tree needs to complete the following steps:

  1. Read the output results of the parent operator from memory as input data
  2. Invoke the hasNext and next methods to process the data according to the operator’s logic, such as filtering, projection, aggregation, etc.
  3. Output the processed results in a standardized form to memory for downstream operators to consume

Therefore, the interaction between any two operators will involve the two steps mentioned earlier, which are random access of memory data and virtual function calls, and they are the culprits of low CPU utilization.

What are the advantages of WSCG? #

The introduction of the WSCG mechanism by Tungsten is precisely to eliminate the computational overhead introduced by the VI Model. How is this accomplished? Next, let’s use the query of the citizen table as an example to intuitively feel the advantages of WSCG.

For the query statement above, WSCG combines the calling relationships between different operators in the AST syntax tree to generate the “handwritten code” shown in the figure above. In this handwritten code, we implement the end-to-end data calculation logic (filtering, projection, aggregation) all at once.

In this way, we use the implementation method of handwritten code to not only eliminate operators but also eliminate virtual function calls of operators, and there is no data exchange between different operators. The calculation logic is applied to the data all at once. Moreover, each instruction in the code is explicit and can be loaded sequentially into CPU registers. The source data can also be loaded into the CPU cache in order, which greatly improves the efficiency of the CPU.

Of course, the code generated by WSCG at runtime may differ in form from the handwritten code we mentioned here. However, this does not affect our understanding of the features and advantages of WSCG. You may ask, “Isn’t WSCG just runtime code refactoring?” Yes, fundamentally, the working process of the WSCG mechanism is based on a “poor performance code” and dynamically reconstructs a “better performance code” at runtime.

How does WSCG dynamically generate code at runtime? #

So how does WSCG dynamically generate code at runtime?

Let’s take the example of the query on the citizen table we just mentioned. The syntax tree has four nodes from left to right: Scan, Filter, Project, and Aggregate. However, because Aggregate introduces Shuffle and Stage splitting, these four nodes will generate two stages. Because WSCG generates handwritten code within a stage, let’s focus on the stage composed of the first three operators: Scan, Filter, and Project.

In the previous lesson, we mentioned that before the Spark Plan is transformed into a Physical Plan, it applies a series of Preparation Rules. One important rule is the CollapseCodegenStages rule, which tries to generate “handwritten code” for each stage.

In general, the process of generating handwritten code is divided into two steps:

  • From parent nodes to child nodes, recursively call doProduce to generate the code framework
  • From child nodes to parent nodes, recursively call doConsume to fill in the computation logic of each operator in the framework

To make it less abstract, let’s take the example of the first stage to see the process of code generation visually.

First, add the WholeStageCodeGen node above the top node of the stage, which is the Project. The WholeStageCodeGen node triggers the calculation of the entire code generation process by calling doExecute. doExecute recursively calls the doProduce function of each child node until it encounters a Shuffle Boundary. Here, Shuffle Boundary refers to the boundary of the Shuffle, either the data source or the output of the previous stage. The doProduce function called in the leaf node (Scan) first generates the framework of the handwritten code, as shown in the blue part on the right side of the figure.

Then, the doProduce in Scan will recursively call the doConsume function of each parent node in reverse. In the process of executing the doConsume function, different operators convert the relational expression into Java code and then embed this code into the code framework like “fill in the blanks”. For example, the if statement generated by the orange doConsume function contains the condition for judging if the region is Beijing, and the purple doConsume function generates Java code to obtain the required field userId.

In this way, Tungsten uses the CollapseCodegenStages rule to transform the Spark Plan output by Catalyst into a piece of “handwritten code” through two levels of recursive calls, and delivers this handwritten code to the DAGScheduler. After receiving the code, the DAGScheduler coordinates its two subordinates, TaskScheduler and SchedulerBackend, to complete distributed task scheduling.

Summary #

Tungsten is the “right brain” of Spark SQL, and understanding its features and advantages is crucial for performance tuning in SparkSQL. Specifically, we can start with two improvements to its kernel engine: data structure design and Whole-Stage Code Generation (WSCG).

In terms of data structure, we need to understand two improvements made by Tungsten.

Firstly, Tungsten designed the UnsafeRow binary byte sequence to replace the storage method of JVM objects. This not only improves CPU storage efficiency but also reduces the number of objects required to store data records, thereby improving garbage collection (GC) efficiency.

Secondly, in order to manage on-heap and off-heap memory uniformly, Tungsten designed a 128-bit memory address, with the first 64 bits storing the object reference and the last 64 bits storing the offset address.

In terms of on-heap memory management, based on the design mechanism of Tungsten memory addresses and memory pages, data structures implemented by Tungsten (such as HashMap) use contiguous space to store data entries. Accessing contiguous memory is beneficial for improving CPU cache hit rate and therefore CPU efficiency. Since memory pages are essentially Java Objects, the memory page management mechanism often greatly reduces the number of objects required to store data, making it very GC-friendly.

Regarding Tungsten’s WSCG, we need to understand its concept and advantages.

Firstly, WSCG refers to generating a “handwritten code” based on the invocation relationship between operators within the same stage, in order to merge all computations into a unified function. Essentially, the WSCG mechanism works by dynamically reconstructing a “better-performing code” based on a “poor-performing code” at runtime.

More importantly, the “handwritten code” solves two core pain points of the VI computation model: frequent virtual function calls between operators, and memory random access introduced by data exchange between operators. Each instruction in the handwritten code is explicit and can be sequentially loaded into CPU registers, and the source data can be loaded into CPU cache levels sequentially as well. As a result, CPU cache hit rate and efficiency are greatly improved.

Daily Exercise #

  1. What improvements do you think Tungsten has made in terms of data structure for sorting operations?
  2. What are the differences and connections between Expression Codegen and Whole Stage Codegen?

Looking forward to seeing your thoughts and answers in the comments, see you in the next lecture!