19 Network Perspective How to Effectively Reduce Network Overheads

19 Network Perspective - How to Effectively Reduce Network Overheads #

Hello, I am Wu Lei.

When balancing different hardware resources, network overhead is undoubtedly the most sluggish one compared to CPU, memory, and disk. This is particularly evident in latency handling.

The following figure shows the comparison results of latency for different hardware resources, with the minimum processing unit being nanoseconds. You may not have a concept of nanoseconds, so for the sake of comparison, I have magnified nanoseconds to seconds on a proportional basis. In this way, the latency of other hardware resources will also be magnified. Finally, when we compare them, we will find that network latency is measured in days!

Therefore, in order to maintain the balance between hardware resources, it is essential for us to minimize network overhead in performance tuning. In today’s lecture, I will analyze and summarize methods for effectively reducing network overhead in different stages of the data lifecycle based on the time sequence in which data enters the system, which includes data reading, data processing, and data transmission.

Data I/O #

For the vast majority of applications, the first step is to read data from a distributed file system. Spark supports a wide range of data sources, including various storage formats and systems.

With so many storage formats and external storage systems, there are countless combinations, and each combination has its own use cases. So, how can we determine when network overhead will occur? In fact, regardless of the file format or storage system, whether accessing a data source introduces network overhead depends on the locality of the task and data, that is, the locality level of the task, which has four levels:

  • PROCESS_LOCAL: The task and the data are in the same JVM process.
  • NODE_LOCAL: The task and the data are on the same compute node, with data possibly on disk or in another JVM process.
  • RACK_LOCAL: The task and the data are not on the same node, but on the same physical rack.
  • ANY: The task and the data are across racks or even across data centers (DC).

Based on the definition, we can easily determine whether disk or network overhead is introduced for tasks at different locality levels, as shown in the table below. From the table, it is clear that data access efficiency decreases from PROCESS_LOCAL to ANY. During the data reading phase, when the data is not yet loaded into memory, tasks cannot be scheduled at the PROCESS_LOCAL level. Therefore, the optimal level we can schedule at this stage is NODE_LOCAL.

According to the definition of NODE_LOCAL, at this level, the destination node for scheduling must have the data partitions required by the Spark computation task on disk. This means that in terms of cluster deployment, the Spark cluster is physically tightly coupled with the external storage system. In contrast, if the Spark cluster and storage cluster are physically separated, the task locality level can only degrade to RACK_LOCAL or even ANY, in order to obtain the required data partitions via the network.

Therefore, whether there is network overhead for Spark with HDFS or Spark with MongoDB depends entirely on their deployment mode. With a tight physical coupling, Spark uses disk I/O instead of network overhead to obtain data at the NODE_LOCAL level. If the Spark cluster and storage cluster are physically separated, network overhead cannot be avoided.

In addition, the physical isolation also affects the efficiency of data writing. When data processing is complete and the processed results need to be written to an external storage system, in a tightly coupled mode, the data writing will save the data partitions to the local node to avoid network overhead.

It is worth mentioning that in an enterprise’s private data center, it is easier to customize the deployment mode of the cluster, and a tightly coupled approach is often used to improve data access efficiency. However, in a public cloud environment, the compute cluster is often physically separated from the storage system, so the reading of data sources can only be done through the network.

Based on the analysis above, for business scenarios where data I/O accounts for a high proportion, we can plan the deployment mode of the cluster from the beginning to be prepared.

Data Processing #

After data reading is complete, it enters the data processing phase. In the process of data processing, what techniques can help reduce network overhead?

Less is more #

Speaking of network overhead in data processing, I guess the operation you first think of is Shuffle. As the “performance bottleneck” of most computing scenarios, Shuffle is indeed the culprit of network overhead. According to the principle of “less is more” in development, we naturally want to find ways to avoid Shuffle. In data association scenarios, the best way to eliminate Shuffle is to convert Shuffle Joins into Broadcast Joins. We have detailed explanations on tuning techniques in the lectures on broadcast variables. Although the creation process of broadcast variables also introduces network transmission, two evils are to be weighed against their advantages. Compared with the network overhead of Shuffle, the overhead of broadcast variables is negligible.

Following the principle of “less is more”, eliminating Shuffle is undoubtedly the best choice. If it is really impossible to avoid Shuffle, we should try to use more Map-side aggregation in calculations to reduce the amount of data that needs to be distributed over the network. The typical practice in this regard is to replace groupByKey with reduceByKey and aggregateByKey. However, in the current RDD API, which is becoming less and less frequently used, this tuning technique has actually become obsolete. However, the idea of Map-side aggregation is not outdated. Why do I say that? Next, let me explain in detail through a small example.

In most 2C (To Consumer) business scenarios, we need to portray user profiles. Our small example is a part of “user profiles”: Given a user table, we need to calculate the interest list by grouping users and ensure that the content of the interest list is unique, that is, there are no duplicate interest items. The schema of the user table is shown in the table below.

To obtain the interest list for each group, we should first group by groupId, collect the interest lists of all users in the group, then flatten the lists, and finally remove duplicates to get a unique interest list. The idea is quite simple. Let’s take a look at the first version of the implementation code.

val filePath: String = _
val df = spark.read.parquent(filePath)
df.groupBy("groupId")
.agg(array_distinct(flatten(collect_list(col("interestList"))))))

In this version, collect_list, flatten, and array_distinct are used to collect, flatten, and deduplicate the interest lists. It fully meets the business logic. However, when we see “collect”-like operations, such as groupByKey and collect_list, we should instinctively be alert. Because these types of operations distribute the finest-grained complete data throughout the network. Compared with other operators, this type of operator introduces the largest network overhead.

So can we move them to the Map-side in advance to reduce the amount of data to be distributed in Shuffle? Of course, we can. For example, for the collection operation in the case, we can deduplicate the data at the Map-side as early as possible, and then look up the DataFrame development API to see if there is a Map-side aggregation operator corresponding to collect_list.

Therefore, in the data processing phase, we should follow the principle of “less is more” in development and actively reduce the network overhead in the calculation process. For data association scenarios, we should try to convert Shuffle Joins into Broadcast Joins to eliminate Shuffle. If it is really impossible to avoid Shuffle, we can use more Map-side aggregation in calculations to reduce the amount of data to be distributed over the network.

In addition to Shuffle, there is another operation that distributes data over the network. This operation is very hidden, and we often overlook it. It is the multi-replica RDD caching.

For example, in real-time stream processing scenarios, due to the high availability requirements of the system, you may use storage modes like “_2” or even “_3” to cache multiple copies of data in memory and disk. When the number of data replicas is greater than 1, local data shards will be copied to other nodes over the network, resulting in network overhead. Although this seems to be a minor change in the storage mode string, it will bring many unexpected overheads at runtime. Therefore, if your application does not have strict requirements for high availability, I recommend that you try not to overuse multi-replica RDD caching.

Data Transmission #

Finally, we come to the stage of data transmission. As we know, data needs to be serialized before it can be written to disk or transmitted over the network. In Spark, there are two serializers available for developers to choose from: the Java serializer and the Kryo Serializer. Both Spark’s official documentation and technical blogs online recommend using the Kryo Serializer to improve efficiency. Generally speaking, the Kryo Serializer outperforms the Java serializer in terms of processing efficiency and storage efficiency by multiples. Therefore, serializing the data with the Kryo Serializer before data distribution can further reduce network overhead.

However, many students often complain to me, “Why is the size of the serialized data using the Kryo Serializer larger than that using the Java serializer?” Pay attention here, I want to remind you: For some custom data structures, if you do not explicitly register these types with the Kryo Serializer, although it will still perform the serialization, each serialized data record will carry a class name, which is obtained through reflection and can be very long. In the case of billions of samples, the storage overhead can be quite considerable.

So how do we register custom types with the Kryo Serializer? Actually, it’s very simple. We just need to call the registerKryoClasses method on SparkConf, as shown in the example code below.

// Registering types with the Kryo Serializer
val conf = new SparkConf().setMaster("").setAppName("")
conf.registerKryoClasses(Array(
classOf[Array[String]],
classOf[HashMap[String, String]],
classOf[MyClass]
))

In addition, I have summarized the configuration settings related to the Kryo Serializer in the table below, making it easy for you to look them up. Among them, spark.serializer can be explicitly specified to use the Kryo Serializer as the serializer for Spark. As for spark.kryo.registrationRequired, it is quite interesting. If we set it to True, when the Kryo Serializer encounters custom types that have not been registered, it will not perform the serialization work but instead throw an exception and interrupt the task execution. The benefit of doing this is that during the development and debugging phase, it can help us capture those types that we forgot to register.

Kryo Serializer Configuration

To help you understand better, let’s compare the Java serializer and the Kryo Serializer to two different moving companies.

The Java Serializer is a well-established company with a high market share, and it is very popular because of its excellent user experience. As long as you provide your home address, the Java Serializer will send someone to your home to help you pack. You don’t need to tell them what items you have at home, as they have their own packaging standards for different types of items, which can save you a lot of trouble. However, the packaging standards of the Java Serializer are too rigid. Not only is the packaging speed slow, but the resulting packages are often oversized and take up a lot of space. You must rent the largest truck to fit all the items in your home.

The Kryo Serializer is a newcomer in the market, and it outperforms the Java Serializer in terms of packaging speed and package size. The Kryo Serializer packs items in the most compact way, without wasting any space, so all packages can be fit in a small truck. However, before subscribing to the Kryo Serializer’s services, users need to provide a detailed inventory of items, which many users find troublesome. As a result, the market share of the Kryo Serializer has not been able to rise.

Now, do you have a deeper understanding of the Java serializer and the Kryo Serializer? From this, we can see that if you want to make every effort to reduce network overhead in the data transmission process, you can try using the Kryo Serializer for data serialization. On the other hand, if you think development cost is the main pain point, then using the default Java Serializer is also acceptable.

Summary #

In this lecture, I summarized effective ways to reduce network overhead at different stages of data processing.

Firstly, in the data reading stage, in order to obtain NODE_LOCAL locality level, we need to tightly couple the Spark cluster with the external storage system physically. This way, Spark can use disk I/O instead of network overhead to retrieve data. Otherwise, the locality level will degrade to RACK_LOCAL or ANY, and network overhead cannot be avoided.

Secondly, in the data processing stage, we should follow the development principle of “saving wherever possible” and use Broadcast Joins in appropriate scenarios to avoid the network overhead introduced by Shuffle. If Shuffle cannot be avoided, we can use more Map-side aggregation in computations to reduce the amount of data to be distributed over the network. Additionally, if the application does not require high availability, it is advisable to avoid storage modes with replica count greater than 1 to reduce the additional overhead caused by replica copies across nodes.

Lastly, before distributing data over the network, we can use the Kryo Serializer to improve the storage efficiency of serialized bytes, thus effectively reducing the amount of data distributed over the network and overall network overhead. It is important to note that in order to fully utilize the advantages of the Kryo Serializer, developers need to explicitly register custom data types; otherwise, the effect may be counterproductive.

Daily Exercise #

  1. For the example of Map-side aggregation in the article, do you know what is the Map-side aggregation operator corresponding to collect_list?

  2. Can you think of any other Map-side aggregation calculation scenarios?

  3. For different stages of data processing, do you know any other methods to reduce network overhead?

Looking forward to seeing your thoughts and answers in the comment section. Feel free to share this lecture with others as well. See you in the next lecture!