06 Storage System Space for Time or Time for Space

06 Storage System - Space for Time or Time for Space #

Hello, I am Wu Lei.

Today, we are going to learn about the storage system in Spark. Just like the scheduling system we learned in the previous lesson, the storage system is also one of the infrastructure components of the Spark distributed computing engine.

You might wonder, “In my daily development work, besides implementing business logic, do I really need to care about such low-level things?” Indeed, the storage system seems far away from developers. However, if we focus on the objects served by the storage system, you may change this perspective.

Next, let’s first take a look at who the Spark storage system serves, and then explore its important components and how it works, so you can fully understand the storage system at once.

Who does the Spark storage system serve? #

The Spark storage system is used to store three types of data: RDD cache, Shuffle intermediate files, and broadcast variables. Let’s discuss them one by one.

RDD cache refers to the process of materializing RDDs into memory or disk as caches. Caching has two benefits for RDDs with high computational cost and frequency of access: first, by truncating the DAG, it can reduce the computational cost of failure retries; second, by accessing the cached content, it can effectively reduce the number of computations from scratch, thereby improving the end-to-end execution performance of the job as a whole.

As for Shuffle intermediate files, we have to mention the topic of Shuffle. Shuffle plays a role as a performance bottleneck in many scenarios. By resolving the problems introduced by Shuffle, performance improvement can be achieved immediately. Therefore, you need to pay special attention to all aspects related to Shuffle.

We will discuss the working principle of Shuffle in detail later. For now, let’s briefly understand the calculation process of Shuffle. Its calculation process can be divided into two stages:

  • Map stage: Shuffle writer writes intermediate data to local disk according to the partition rules of the Reducers.
  • Reduce stage: Shuffle reader downloads data partitions from various nodes and performs aggregation calculations as needed.

Shuffle intermediate files are actually the output results of the Shuffle Map stage. These results are temporarily stored in local disks in the form of files. In the Shuffle Reduce stage, the Reducers pull these intermediate files over the network for aggregation calculations, such as summation, counting, etc. Within the cluster, in order for the Reducers to pull their own portions of intermediate data, they must know which nodes and what locations these data are stored in. And these critical metadata are saved and maintained by the Spark storage system. So you see, Shuffle cannot be accomplished without a storage system.

Lastly, let’s talk about broadcast variables. In daily development, broadcast variables are often used to distribute small data with high access frequencies within the cluster. By utilizing the storage system, broadcast variables can save the complete data within the scope of Executors’ processes. This way, for all computing tasks within the same Executors, applications can share the complete data carried by broadcast variables at the Process local locality level.

In summary, these three service objects are powerful “levers” for optimizing the performance of Spark applications, and they have close connections with the storage system. Therefore, to effectively use these three optimization techniques, we must have a sufficient understanding of the storage system.

What are the basic components of a storage system? #

Similar to the scheduling system, the Spark storage system is a compound system that includes a number of components such as BlockManager, BlockManagerMaster, MemoryStore, DiskStore, and DiskBlockManager.

However, with many mouths to feed and one person in charge, BlockManager is the most important component, responsible for unified management and coordination of local data access and cross-node transmission on the Executors end. How can we understand this? We can look at it from two perspectives.

  • Externally, BlockManager communicates with the BlockManagerMaster on the Driver end, not only regularly reporting local data metadata to the BlockManagerMaster, but also sporadically fetching global data storage status on demand. In addition, different Executors’ BlockManagers will also push and pull data blocks across nodes in Server/Client mode.

  • Internally, BlockManager combines the functionalities of internal components in the storage system to achieve data storage and retrieval, as well as data transmission.

So, for RDD caching, shuffle intermediate files, and broadcast variables, how does BlockManager store them? Spark storage system provides two storage abstractions: MemoryStore and DiskStore. BlockManager uses them to manage data storage and retrieval in memory and on disk, respectively.

Among them, the broadcast variable’s complete data is stored in the Executors process, so it is managed by the MemoryStore. Shuffle intermediate files are often written to local disks on the nodes, so the writing and access of these files go through the DiskStore. In comparison, RDD caching is slightly more complex. Since RDD caching supports both in-memory caching and disk caching, the storage method depends on the situation. Data cached in memory is handled by the MemoryStore, while data cached on disk is managed by the DiskStore.

With MemoryStore and DiskStore, we have temporarily solved the problem of “where the data is stored”. But, in what “format” should these data be stored in MemoryStore and DiskStore? Regarding the storage format, the Spark storage system supports two types: object values and byte arrays. They can be converted between each other. Serialization is the process of compressing object values into byte arrays, and deserialization is the process of restoring byte arrays to the original object values.

To put it simply, serialized byte arrays are like unassembled boards purchased from IKEA, while object values are various types of tables, chairs, and stools assembled from the boards according to the instructions. It is obvious that the advantage of storing data in object values is that it is ready to use and visually understandable, but it requires more storage space. In comparison, the space utilization rate of serialized byte arrays is much higher. However, if you urgently need to access the data objects inside, deserialization is required, which can be a bit troublesome.

Therefore, there is a trade-off between object values and byte arrays, known as “space-time trade-off”. The specific choice depends on the application scenario. The core principle is: if you want to save space, you can prioritize byte arrays; if you want to access objects as quickly as possible, object values are more direct. However, this dilemma only exists in MemoryStore, as DiskStore can only store serialized byte arrays. After all, anything that is written to disk needs to be serialized first.

Understanding MemoryStore through RDD caching #

Now that we know the core components of the storage system, let’s continue discussing how the MemoryStore and DiskStore manage memory and disk data.

As mentioned earlier, the MemoryStore supports storing both object values and byte arrays, and they are encapsulated using the MemoryEntry data abstraction.

MemoryEntry has two implementation classes: DeserializedMemoryEntry and SerializedMemoryEntry, which are used to encapsulate the original object values and the serialized byte arrays, respectively. DeserializedMemoryEntry uses Array[T] to store object value sequences, where T is the object type, while SerializedMemoryEntry uses ByteBuffer to store the serialized byte sequences.

Thanks to the unified encapsulation of object values and byte arrays by MemoryEntry, MemoryStore can use an efficient data structure to store and access data blocks: LinkedHashMap [BlockId, MemoryEntry], where the Key is the BlockId and the Value is the MemoryEntry in the form of a linked hash dictionary. In this dictionary, each Block corresponds to a MemoryEntry. Obviously, MemoryEntry can be either DeserializedMemoryEntry or SerializedMemoryEntry. With this dictionary, we can easily find and locate MemoryEntry by using the BlockId, which enables fast storage and retrieval of data blocks.

With so many concepts and similar names, it can be overwhelming. But don’t worry, let’s take the example of RDD caching to see how the storage system uses these data structures to cache the data entities encapsulated by RDD into memory.

In the context of RDD, we often use data partitions/splits to represent distributed data. However, in the context of the storage system, data blocks are commonly used to represent the basic unit of data storage. In terms of logical relationship, RDD data partitions correspond one-to-one with storage system blocks, which means an RDD data partition will be materialized into a block in memory or on disk.

Therefore, to summarize the process of caching RDD, it involves materializing the iterator of RDD calculation data and follows the following three steps.

Since we want to cache the data content, we need to first expand the iterator of RDD into actual data values. Therefore, the first step is to use the putIteratorAsValues or putIteratorAsBytes method to expand the RDD iterator into data values and then temporarily store these data values in a data structure called ValuesHolder. We often refer to this step as “unrolling”.

The second step is to save memory space. We can directly call the toArray or toByteBuffer operation on ValuesHolder, convert it into the MemoryEntry data structure. Note that this transformation does not involve memory copying or additional memory overhead. Therefore, Spark officially calls this step “transfer from unroll memory to storage memory”.

In the third step, the MemoryEntry containing RDD data values and the corresponding BlockId will be stored together in the linked hash dictionary with the Key as the BlockId and the Value as the MemoryEntry reference. Therefore, the LinkedHashMap [BlockId, MemoryEntry] caches metadata about data storage, and MemoryEntry is the actual storage unit that saves RDD data entities. In other words, it is not the hash dictionary itself that occupies a large amount of memory, but rather the MemoryEntry units.

In summary, RDD data partitions, blocks, and MemoryEntry correspond one-to-one. When all RDD data partitions are materialized into MemoryEntry and all (Block ID, MemoryEntry) pairs are recorded in the LinkedHashMap dictionary, the RDD completes the process of caching data into memory.

You might ask, “What if the memory space is not enough to accommodate the entire RDD?” Well, it’s simple. Forcing a large RDD into limited memory space is not a wise move. Therefore, Spark will evict blocks and their corresponding MemoryEntry from the dictionary one by one, based on the LRU (Least Recently Used) strategy. Compared to the performance overhead caused by frequent unrolling, materialization, and page swapping, the contribution of the cached data to efficient RDD access is minimal.

Understanding DiskStore through Shuffle #

Compared to MemoryStore, DiskStore is much simpler because it does not require as many intermediate data structures to perform data storage and retrieval. Essentially, the storage and retrieval of data in DiskStore is the conversion between byte sequences and disk files. It uses the putBytes method to store byte sequences into disk files and the getBytes method to convert the file contents into data blocks.

However, in order to perform the conversion between data blocks and files, certain metadata such as the correspondence between data blocks and files, file paths, etc. are essential. While MemoryStore uses a linked hash dictionary to maintain similar metadata, the clever DiskStore does not directly maintain this metadata but instead relies on the powerful assistant called DiskBlockManager.

The main responsibility of DiskBlockManager is to record the correspondence between logical data blocks (referred to as Blocks) and physical files in the disk file system, with each Block corresponding to a disk file. Similarly, each disk file has a corresponding Block ID, just like each item on a shelf has a unique ID identifier.

When DiskBlockManager is initialized, it first creates file directories at the corresponding locations on the disk based on the configuration option spark.local.dir. Then, it creates subdirectories under all the directories specified in spark.local.dir, with the number of subdirectories controlled by the configuration option spark.diskStore.subDirectories, which defaults to 64. All these directories are used to store data files materialized through DiskStore, such as RDD cache files, Shuffle intermediate result files, and so on.

Next, let’s take Shuffle intermediate files as an example to explain the interaction between DiskStore and DiskBlockManager.

By default, Spark uses SortShuffleManager to manage data distribution between stages. During the Shuffle write process, there are three types of result files: temp_shuffle_XXX, shuffle_XXX.data, and shuffle_XXX.index. Data files store partition data, which is obtained by merging temp files, while index files record the offset addresses of different partitions within the data files. Shuffle intermediate files specifically refer to data files and index files, while temp files, acting as temporary disk files, will be deleted in the end.

In different stages of Shuffle write, Shuffle manager calls the putBytes method of DiskStore through BlockManager to write data blocks to files. The files are created by DiskBlockManager, and the file name is the Block ID specified in the putBytes method. These files will start with either “temp_shuffle” or “shuffle” and will be saved in the subdirectories under the spark.local.dir directory.

In the Shuffle read stage, Shuffle manager once again calls the getBytes method of DiskStore through BlockManager to read data files and index files, converting the file contents into data blocks. Finally, these data blocks will be distributed over the network to the Reducer side for aggregation and computation.

Summary #

Mastering the storage system is a key step in optimizing Spark performance, which can be divided into three steps.

The first step is to understand the service objects of the storage system, which are RDD caching, shuffle, and broadcast variables.

  • RDD caching: RDDs with high computation cost and frequent access can be materialized in memory or disk as caches. This can avoid the overhead of frequent DAG backtracking and effectively improve end-to-end execution performance.
  • Shuffle: The location information of intermediate files in the shuffle operation is saved and maintained by the Spark storage system. Without the storage system, shuffle cannot be performed.
  • Broadcast variables: With the help of the storage system, broadcast variables can be stored within the scope of Executors processes, allowing tasks to share the full data carried by broadcast variables at the Process local level.

The second step is to understand two important components of the storage system: MemoryStore and DiskStore. MemoryStore is used to manage data storage and retrieval in memory, while DiskStore is used to manage data storage and retrieval on disk.

For the three service objects of the storage system, broadcast variables are managed by MemoryStore, the disk storage and access of shuffle intermediate files are controlled by DiskStore, and RDD caching may use both components because it supports both memory caching and disk caching.

Finally, we need to understand the working principles of MemoryStore and DiskStore.

MemoryStore supports object values and byte arrays, which are encapsulated using the MemoryEntry data abstraction. There is a trade-off between object values and byte arrays, known as “space-for-time” and “time-for-space”. The choice between the two depends on specific application scenarios.

DiskStore utilizes the mapping between data blocks maintained by DiskBlockManager and disk files to perform the conversion between byte sequences and disk files.

Daily Practice #

  1. Based on the process of storing RDD data in MemoryStore, can you deduce the process of accessing the cached content of the RDD through the getValues/getBytes method via MemoryStore?
  2. Referring to the process of caching RDD storage, can you deduce the process of storing broadcast variables in MemoryStore?

Looking forward to seeing your thoughts and discussions in the comments section. See you in the next lecture!