33 Data Association in Stream Computing Stream to Stream and Stream to Batch

33 Data Association in Stream Computing - Stream-to-Stream and Stream-to-Batch #

Hello, I am Wu Lei.

In the previous lecture, we mentioned that Structured Streaming reuses all the data processing capabilities provided by Spark SQL, such as data extraction, filtering, grouping and aggregation, association, sorting, and so on. However, among these common data processing types, there is one type of operation that requires special attention, and that is data association (Joins).

There are two main reasons for this. First, data association is widely used and can be considered as one of the most frequently used operation types in data applications. Secondly, unlike data association in batch processing, data association in stream computing also needs to consider some inherent limitations in the stream processing process, such as time window, data delay tolerance, output mode, and so on.

Therefore, in today’s lecture, we will specifically discuss data association in Structured Streaming. First, we will take a look at the skill set of Structured Streaming to see which types of data association it supports. Then, we will use a recommended example in a short video to experiment and summarize the applicable scenarios and considerations for different types of data association.

Data Association in Stream Computing #

We know that if we divide data association based on the association format, it can be divided into Inner Join, Left Join, Right Join, Semi Join, Anti Join, and so on. If we divide it based on the implementation method, it can be divided into Nested Loop Join, Sort Merge Join, and Hash Join. And if we consider the data distribution pattern in a distributed environment, Join can be further divided into Shuffle Join and Broadcast Join.

The above three classification criteria are orthogonal to each other. We have introduced their respective applicable scenarios and advantages and disadvantages in the Spark SQL learning module (you can review the lessons [17] and [18] if you can’t remember).

In the context of stream computing, data association can be further divided into “stream-batch association” and “two-stream association” based on the different data sources. The so-called “stream-batch association” (Stream-Static Join) refers to the scenario where one table participating in the association comes from offline batch data, while the source of the other table is real-time data stream. In other words, dynamic real-time data streams can be associated with static offline data to provide us with multi-dimensional data insights.

On the other hand, “two-stream association” (Stream-Stream Join) refers to the situation where both tables participating in the association come from different data streams and belong to the association calculation between dynamic data and dynamic data, as shown in the figure below.

Image

Obviously, compared with association format, implementation method, and data distribution pattern, the classification based on data source is also orthogonal to the previous three. We know that data association has already been divided in sufficient detail based on the first three classification criteria. With the additional orthogonal classification, data association will become even more refined.

What’s even more bothersome is that under the Structured Streaming stream computing framework, “stream-batch association” and “two-stream association” have different support and limitations for different association formats. This is also one of the reasons why we need to pay special attention to data association in stream processing.

Next, we will discuss “stream-batch association” and “two-stream association” separately, talking about their supported features and limitations. Following the principle of starting with the easier topic, we will first introduce “stream-batch association” and then move on to “two-stream association”.

Flow-Batch Association #

To better illustrate the flow-batch association, let’s start with a practical scenario. In today’s popularity of short videos, recommendation engines play a crucial role. To achieve the best recommendation effect, the recommendation engine must rely on real-time feedback from users.

The so-called real-time feedback refers to the interactive actions we are familiar with, such as likes, comments, and shares. However, what needs to be emphasized here is the “real-time” nature or “timeliness” of these actions. After all, in the current era of abundant choices, a user’s interests and preferences also change over time, so it is important to capture their recent interests and hobbies.

Suppose we need to associate offline user attributes with real-time user feedback in order to build user feature vectors. Clearly, in this feature vector, we want to include not only the user’s own attribute fields, such as age, gender, education background, occupation, etc., but also the real-time interactive information, such as the number of likes and shares in the past hour, etc., in order to have a more comprehensive understanding of the user.

Generally speaking, real-time feedback comes from online data streams, while user attribute data is often stored in offline data warehouses or distributed file systems. Therefore, the association between real-time user feedback and user attribute information is a typical flow-batch association scenario.

So, how can we “combine” offline user attributes with online user feedback in the context of short videos we just mentioned? In order to demonstrate the process and usage of flow-batch association, we naturally need to prepare the offline data and online data in advance. Following the principle of simplicity and allowing you to reproduce the examples in our course with just a laptop, we will use the local file system to store the offline user attributes.

So far, we have only demonstrated the usage of Sockets for generating data streams. In fact, in addition to Sockets used for testing, Structured Streaming also supports Kafka, files, and other Sources as data stream sources. In order to cover as many knowledge points as possible, in this lesson, let’s simulate online user feedback using files.

Remember? Structured Streaming creates various data streams using the readStream API. To create a data stream in the form of files, we only need to pass the file format to the format function and enable the corresponding options, as shown below. For the general usage of the readStream API, you can review the “Streaming Word Count” (Lesson 30).

var streamingDF: DataFrame = spark.readStream
.format("csv")
.option("header", true)
.option("path", s"${rootPath}/interactions")
.schema(actionSchema)
.load

For this code snippet, there are two places you need to pay special attention to. One is the format function, which takes various file formats as its parameters, such as CSV, Parquet, ORC, etc. The second place is the option option that specifies the listening address, which is option(“path”, s"${rootPath}/interactions").

This option specifies the file system directory that Structured Streaming needs to listen to. Once new data content enters this directory, Structured Streaming loads the new data as a stream.

It should be noted that the above code is incomplete, the purpose is to give you a preliminary understanding of the Source in file format. As we progress in the subsequent explanations, we will provide the complete code and explain each step in detail.

To generate data streams in the form of files, we only need to copy the files containing user interaction behavior to the listening directory of Structured Streaming one by one. In our example, this is the interactions directory.

Image

As shown in step 1 of the figure above, we pre-save the user feedback files in the temporary staging directory, and then copy the files one by one to the interactions directory, thus simulating the generation of data streams. As for user attribute information itself, it is offline data. Therefore, we save the relevant data files in the userProfile directory, as shown in step 3 of the figure.

For the flow-batch association calculation process mentioned above, let’s first understand the data so that we can better understand the subsequent code. The offline user attributes are relatively simple, only including the id, name, age, and gender fields. The file content is as follows.

Image

The online user feedback is more complex, including fields such as userId, videoId, event, and eventTime. The first two fields represent the user ID and the short video ID, respectively, while event represents the type of interaction, including three values: Like, Comment, and Forward. eventTime represents the timestamp of the interaction, as shown below.

Image

In addition to the interactions0.csv mentioned above, to simulate the generation of data streams, I have also prepared two more files: interactions1.csv and interactions2.csv. Their schemas are completely identical to interactions0.csv, and their content is similar. For these three files, we temporarily store them in the staging directory.

Alright, after preparing the data, next, we can create DataFrames from batch data and streaming data, and then associate them to build user feature vectors. First, let’s load the data.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType

// The root directory for saving folders like staging, interactions, userProfile, etc.
val rootPath: String = _

// Use the read API to read offline data and create a DataFrame
val staticDF: DataFrame = spark.read
.format("csv")
.option("header", true)
.load(s"${rootPath}/userProfile/userProfile.csv")

// Define the Schema for the user feedback files
val actionSchema = new StructType()
.add("userId", "integer")
.add("videoId", "integer")
.add("event", "string")
.add("eventTime", "timestamp")

// Load streaming data using readStream API, note the difference and connection between readStream API and read API
var streamingDF: DataFrame = spark.readStream
// Specify file format
.format("csv")
.option("header", true)
// Specify the listening directory
.option("path", s"${rootPath}/interactions")
// Specify data schema
.schema(actionSchema)
.load

// To make it easier to correspond the code with the computational process, I have posted the schematic diagram of stream-batch association again below. The above code corresponds to steps 2 and 3 in the figure, which are the loading of streaming data and batch data respectively.

![Diagram](../images/ebb4bc05741f0923f83cb010c360b43b.jpg)

From the code, we can see that the usage of the readStream API is almost identical to that of the read API. Moreover, the return types of both are DataFrames. Therefore, in terms of usage, the association between streaming and batch is no different from the association with a regular DataFrame, as shown below:

```scala
// Interactions data grouping and aggregation, corresponding to step 4 in the diagram
streamingDF = streamingDF
// Create a watermark, set the maximum tolerance to 30 minutes
.withWatermark("eventTime", "30 minutes")
// Group by time window, userId, and event type
.groupBy(window(col("eventTime"), "1 hours"), col("userId"), col("event"))
// Count the interactions of different types for each time window and user
.count

/**
Stream-batch association, corresponding to step 5 in the diagram
As you can see, there is no difference between the association between two DataFrames and the association between streaming and batch, in terms of usage.
*/
val jointDF: DataFrame = streamingDF.join(staticDF, streamingDF("userId") === staticDF("id"))

In addition to no difference in usage, the optimization methods applicable to regular DataFrame data association are also applicable to stream-batch association. For example, for streamingDF, every micro-batch triggered scans the offline data encapsulated by staticDF once.

Obviously, this is not an efficient approach in terms of execution efficiency. Combining the optimization method of Broadcast Join learned from the Spark SQL module, we can create a broadcast variable on staticDF, and then transform the original Shuffle Join in stream-batch association into a Broadcast Join to improve execution performance. This optimization technique only involves a few lines of code modification, so I leave it to you as homework to practice.

After completing the stream-batch association, we need to print the calculation result to the terminal. Console is one of the supported Sinks in Structured Streaming, and it can help us to confirm whether the calculation result matches the expectation, as shown below:

```scala
jointDF.writeStream
// Specify Console as the output Sink
.format("console")
// Specify output options
.option("truncate", false)
// Specify output mode
.outputMode("update")
// Start the stream processing application
.start()
// Wait for interruption command
.awaitTermination()

The above code may be familiar to you. In previous lectures, we also specified Console as the output Sink, and the steps are the same.

Well, here is the complete code for the stream-batch association example. Next, let's enter the code into the spark-shell in the local environment, and then copy the interactions*.csv files from the staging folder to the interactions directory one by one to simulate the data flow generation and trigger the computation of the stream-batch association. You can download the complete code and data from the [GitHub address](https://github.com/wulei-bj-cn/learn-spark/tree/main/chapter33).

Here, I have attached part of the calculation result for your reference. The screenshot below shows the result we obtained after copying the interactions0.csv file to the interactions directory. You can verify it in your environment and continue to copy the remaining two files to the listening directory to observe the execution effect of the stream-batch association.

![Screenshot](../images/47fc7fb6a8fa32e64a7131b3489a66cb.png)

Dual-Stream Association #

After understanding the stream-batch association, let’s talk about “dual-stream association”. Obviously, the main difference between dual-stream association and stream-batch association lies in the different sources of data. In addition, in dual-stream association, the handling of event time is particularly crucial. Why is that?

After studying the previous lesson, we know that in a continuous stream of data, there will always be Late Data. The main problem that needs to be solved with Late Data is whether it should be included in the current batch computation.

Undoubtedly, data association is the most common calculation. Therefore, in dual-stream association, we should use the Watermark mechanism to clearly specify the “tolerance” of Late Data for each data stream, in order to avoid excessive consumption of system resources by Structured Streaming for maintaining state data. The usage of Watermark is simple, you can review the [previous lesson] for a recap.

Speaking of which, you may ask: “What is state data? And why does maintaining state data consume excessive system resources?” A picture is worth a thousand words. Let’s illustrate the potential problems and risks of maintaining state data through the following diagram.

Image

Suppose we have two data streams, one is a stream of short video releases, which records metadata related to short videos, such as ID, Name, etc. The other data stream is the interaction stream, which represents user interaction with short videos. In fact, in the previous example of stream-batch association, the data stream we used was also the interaction stream, which should be familiar to you.

Now, we want to calculate the popularity of short videos after a certain time period (such as 1 hour, 6 hours, 12 hours, etc.). The so-called popularity is actually the statistical count of interactive behaviors such as likes, comments, and shares.

To achieve this, we can first associate the two data streams based on the short video ID, and then perform the statistical count. The diagram above demonstrates the association process of the two data streams under the Micro-batch mode. To get to the point, let’s focus on the short video with ID = 1.

Obviously, in the video stream, the release of a short video occurs only once, even if there are multiple short videos with exactly the same content, they will have different ID values in the data records. In the interaction stream, there can be multiple entries with ID = 1, and they will be distributed in different Micro-batches. In fact, as long as the video is not taken down, over time, the interaction stream will always carry interaction data with ID = 1.

In order to associate the records with ID = 1 in the video stream with the interaction data in the interaction stream, we need to continuously cache all the contents of batch 0 in the video stream in memory, waiting for the “delayed” interaction data with ID = 1. Data that needs to be cached for subsequent calculations like this video stream is called “state data”. Obviously, the longer and larger the backlogged state data is in memory, the greater the memory pressure.

In dual-stream association, in addition to requiring the addition of Watermark mechanisms to both data streams, in order to further limit the size of state data, Structured Streaming also requires restrictions on event time in the association conditions. What does this mean? Let’s interpret it with the example of the video stream and the interaction stream, and using code.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType

// The root directory of folders such as staging, interactions, userProfile, etc.
val rootPath: String = _

// Define the video stream schema
val postSchema = new StructType().add("id", "integer").add("name", "string").add("postTime", "timestamp")
// Listen to the videoPosting directory, and load new files in real-time data stream mode
val postStream: DataFrame = spark.readStream.format("csv").option("header", true).option("path", s"${rootPath}/videoPosting").schema(postSchema).load
// Define Watermark and set the tolerance for Late Data
val postStreamWithWatermark = postStream.withWatermark("postTime", "5 minutes")

// Define the interaction stream schema
val actionSchema = new StructType().add("userId", "integer").add("videoId", "integer").add("event", "string").add("eventTime", "timestamp")
// Listen to the interactions directory, and load new files in real-time data stream mode
val actionStream: DataFrame = spark.readStream.format("csv").option("header", true).option("path", s"${rootPath}/interactions").schema(actionSchema).load
// Define Watermark and set the tolerance for Late Data
val actionStreamWithWatermark = actionStream.withWatermark("eventTime", "1 hours")

// Dual-stream association
val jointDF: DataFrame = actionStreamWithWatermark
.join(postStreamWithWatermark,
expr("""
// Set the Join Keys
videoId = id AND
// Constraint the Event time
eventTime >= postTime AND
eventTime <= postTime + interval 1 hour
"""))

The first two parts of the code are relatively simple, which is to read the content of new files from the monitored folders, and then create the video stream and the interaction stream respectively, and set the Watermark mechanism on both streams. These contents have been learned before, so let’s focus on the code for dual-stream association at the end.

As you can see, in the association conditions, in addition to setting the association primary and foreign keys, it is also necessary to constrain the event time for each table. In this case, postTime is the event time of the video stream, and eventTime is the event time of the interaction stream. The meaning of the above code is that for any posted video stream, we only care about the interaction behavior within one hour, and the interaction data beyond one hour will not be included in the association calculation.

In this way, with the “protection” of the Watermark mechanism, the limitation of event time further reduces the time in which the state data needs to be kept in memory, thereby reducing the pressure on system resources. In short, for the maintenance of state data, the Watermark mechanism and the limitation of event time provide “double insurance”.

Key Takeaways #

Alright, that’s it for today’s content. Let’s summarize what we’ve learned. First of all, we need to know that structured streaming supports two types of join modes, namely “stream-batch join” and “stream-stream join”, depending on the source of the data stream.

Stream-batch join combines stream processing with batch processing, making structured streaming capable of serving a wider range of business scenarios. Stream-batch join is relatively straightforward to use, where you can read real-time stream data and offline data separately using the readStream and read APIs, and then perform the data join using regular join syntax.

In today’s demonstration, we used a File source, so you need to familiarize yourself with the general usage of the File source. Specifically, you need to specify the file format through the format function in the readStream API and specify the monitoring directory using options. Once a new file is moved to the monitoring directory, Spark loads the new data as a data stream.

For stream-stream join, it is important to understand that structured streaming needs to cache and maintain state data in this mode. The maintenance of state data is mainly to ensure logical consistency in computation. In order to include late data that meets the conditions in the computation, structured streaming needs to continuously cache state data in memory. Needless to say, the accumulation of state data will put pressure and risks on system resources.

To alleviate this pressure and risk, in stream-stream join, firstly, we should set a watermark mechanism for both participating data streams. Moreover, in terms of syntax, structured streaming imposes restrictions on the applicability of event time in join conditions. With these “double insurance” mechanisms, developers can limit the performance risks brought by state data maintenance within a controllable range, thus ensuring stable application operation while implementing business logic.

Exercise Questions #

There are two questions for today.

The first question is, in the streaming-batch join section, I demonstrated the data join operation using the interactions0.csv file. Please try it out in your environment and continue copying the remaining two files (interactions1.csv, interactions2.csv) to the monitoring directory to further observe the performance of the streaming-batch join.

The second question is, in stream-stream join, we need Watermark and join conditions to constrain the cost and overhead of maintaining the state data. So, do we also need the same constraints in streaming-batch join? Why?

Feel free to communicate with me in the comment section and I also recommend you to share this lecture with more colleagues and friends.