32 Window Operations and Watermark What Outstanding Mechanisms Does the Streaming Engine Provide?

32 Window Operations and Watermark - What Outstanding Mechanisms Does the Streaming Engine Provide? #

Hello, I am Wu Lei.

In the previous lecture, we learned about the computing model and fault-tolerance mechanisms of Structured Streaming from a theoretical perspective. Understanding these fundamental principles will help us establish a solid foundation for developing stream processing applications.

In the lecture on “Streaming Word Count,” we demonstrated the general process of stream processing development within the Structured Streaming framework. With the readStream API and writeStream API, we can easily retrieve data streams from a source and write processed data to a sink, similar to working with DataFrames.

In today’s lecture, we will continue discussing the features and capabilities that the Structured Streaming stream processing engine provides to developers from a functional perspective. These features and capabilities enable you to design and implement stream processing applications more flexibly.

How to Benefit From Structured Streaming? #

After studying the computational model, we know that the execution plans of jobs, whether it’s multiple micro-batches in batch mode, multiple jobs, or a long-running job in continuous mode, will ultimately be optimized and executed by Spark SQL and Spark Core.

Image

This brings two benefits. On one hand, all the development capabilities supported by Spark SQL, whether it’s the rich DataFrame operators or flexible SQL queries, can be used in the Structured Streaming engine. Based on what we have learned before, we can perform various transformations and aggregations on DataFrames built on streaming data, just like processing regular DataFrames.

On the other hand, since the entry point for development is also a DataFrame, streaming applications can also enjoy the “performance bonus” provided by Spark SQL. In the Spark SQL learning module, we learned about the Catalyst optimizer and Tungsten, which optimize user code to improve application performance.

Therefore, in terms of framework capabilities, we can simply summarize that Structured Streaming has all the capabilities of Spark SQL. However, in addition to the basic data processing capabilities, in order to better support streaming computing scenarios, the Structured Streaming engine also provides some specialized computing capabilities specifically for stream processing, such as window operations, watermarks, and handling of delayed data, and so on.

Window Operations #

Let’s first talk about Window operations in Structured Streaming. It refers to the consumption and processing of messages in the data stream based on a certain time window. What does that mean? First, we need to understand two basic concepts: Event Time and Processing Time.

Event Time refers to the time when the message is generated. For example, if we type “Apache Spark” in netcat at the timestamp “2021-10-01 09:30:00”, then this time is the event time of the message “Apache Spark”.

Image

On the other hand, Processing Time refers to the time when this message arrives at the Structured Streaming engine. Some people also refer to it as Arrival Time, which is the time when the message arrives at the stream processing system. Clearly, the processing time lags behind the event time.

Window operations in Structured Streaming are actually based on either event time or processing time. It divides the data stream into fixed intervals called time windows and processes messages within these windows. Structured Streaming supports two types of windowing: Tumbling Window and Sliding Window.

We can remember the difference between the two with a simple sentence: Tumbling Window creates non-overlapping windows, while Sliding Window may have overlapping or missing windows, as shown in the following image.

Image

It is easy to see that whether the windows created by Sliding Window have overlaps or missing windows depends on the relationship between the window interval and window size. Tumbling Window and Sliding Window are both equally useful and the choice depends on the application scenario and business requirements.

Enough with the theory, let’s make some adjustments to the “Streaming Word Count” we discussed before to demonstrate the Window operations in Structured Streaming. To make the demonstration clearer, we will use Tumbling Window as the windowing method, and you can consider Sliding Window as a homework assignment.

To perform word counting based on windows, we only need to adjust the code in the data processing part. The code for readStream and writeStream (Update Mode) does not need any modifications. So, to focus on learning Window operations, I will only post the modified part below.

df = df.withColumn("inputs", split($"value", ","))
  // Extract event time
  .withColumn("eventTime", element_at(col("inputs"),1).cast("timestamp"))
  // Extract word sequence
  .withColumn("words", split(element_at(col("inputs"),2), " "))
  // Explode word sequence
  .withColumn("word", explode($"words"))
  // Group by Tumbling Window and word
  .groupBy(window(col("eventTime"), "5 minute"), col("word"))
  // Count
  .count()

To simulate event time, the messages we input into netcat will include both a timestamp and a word sequence. They are separated by commas, and the words are separated by spaces, as shown in the table below.

Image

Therefore, for processing the input data, we first need to extract the timestamp and word sequence separately, and then explode the word sequence into individual words. Next, we group by the time window and word. Here, we need to pay special attention to this line of code:

// Group by Tumbling Window and word
.groupBy(window(col("eventTime"), "5 minute"), col("word"))

The meaning of window(col("eventTime"), "5 minute") is to create a Tumbling time window based on the event time with a duration of 5 minutes. Clearly, the first parameter of the window function is the time axis used for creating the window, and the second parameter specifies the window size. You may now wonder, “What if I want to create a Sliding Window?”

It is actually very simple. Just add a third parameter to the window function call, which specifies the window interval. For example, if we still want to create windows with a size of 5 minutes but use a sliding interval of 3 minutes, we can do it as follows: window(col("eventTime"), "5 minute", "3 minute"). Easy, right?

After grouping by window and word, we can continue to use the count operation to calculate the word counts. It is not difficult to see that most of the transformation operations in the code are common DataFrame operators. This confirms what we mentioned earlier, that Structured Streaming benefits from the “performance dividend” provided by Spark SQL.

Once the code is ready, we can enter them into spark-shell one by one and wait for the data stream from netcat. Switch to the netcat terminal and enter the text content gradually (note that it should be entered gradually). We can then see the computed results in the spark-shell terminal.

Image

As you can see, unlike the “Streaming Word Count,” the counted results here are based on the window (5 minutes). For each time window, the Structured Streaming engine includes the word counts of the messages that fall within that window. It can be inferred that as time goes forward, the windows that have been processed will not be updated.

For example, when the engine processes the message “2021-10-01 09:39:00,Spark Streaming” (referred to as message 39), theoretically, the state of the previous window “{2021-10-01 09:30:00, 2021-10-01 09:35:00}” (referred to as window 30-35), which is the word counts of different words, should not change.

At this point, you may have a question: “That’s not necessarily true! What if the engine receives another message with an event time falling within window 30-35 after message 39?” To answer this question, we need to talk about Late data and the Watermark mechanism in Structured Streaming.

Late Data and Watermark #

Let’s start with Late Data. Late Data refers to messages whose event time is not consistent with the processing time. Although it may sound a bit complicated, we can instantly understand the meaning of Late Data through the following illustration:

Image

Generally speaking, the time a message is generated should be consistent with the time it arrives at the stream processing engine. In other words, the earlier generated messages should arrive earlier, just like the gray part of the messages shown in the above figure.

However, in reality, there are always some messages that have a significant deviation between their processing time and event time due to network delays or various other reasons. These messages may even arrive at the engine later than the messages generated after them. We collectively refer to these messages as “Late Data,” as shown by the red part of the messages in the figure.

Due to the existence of Late Data, the stream processing engine needs a mechanism to determine the validity of Late Data and decide whether to include the late arriving messages in the calculation of the previous window.

For example, let’s consider the red message “Spark is cool.” When it arrives at the Structured Streaming engine, its event time window “{2021-10-01 09:30:00, 2021-10-01 09:35:00}” has already closed. In this case, should the Structured Streaming use the words in the message “Spark is cool” to update the state (word count) for the window 30-35?

To solve the problem of Late Data, Structured Streaming adopts a mechanism called Watermark. To help you understand the principle of the Watermark mechanism more easily, let’s clarify two concepts that are very similar but completely different: watermark and waterline.

To explain the concepts of watermark and waterline clearly, let’s conduct a thought experiment. Suppose there is a box of fresh milk, a straw, and a glass on a table. We open the box and pour all the milk into the glass. Then, we insert the straw into the glass and drink a sip of fresh milk through the straw. Alright, the experiment is done. Now, let’s use it to clarify the concepts.

Image

As shown in the figure, initially, we pour the milk up to the height indicated by the watermark. Then, we drink the milk through the straw. Regardless of how much milk we drink through the straw, the milk trace at the watermark position does not disappear, which means the watermark’s position is relatively fixed. On the other hand, the waterline is different. The more we drink, the faster the waterline drops until we finish drinking all the milk, and the waterline lowers to the bottom of the glass.

Now that we have clarified the concepts of watermark and waterline, we need to relate these two concepts to the concepts in stream processing. After all, the “pour milk” thought experiment is used to help us learn about the Watermark mechanism.

Firstly, both watermark and waterline correspond to the event time of messages. The watermark corresponds to the maximum event time among all the messages the system currently receives. The waterline refers to the event time corresponding to the watermark minus the user-defined tolerance value. For the sake of convenience, we’ll refer to this tolerance value as T. In Structured Streaming, we call the event time corresponding to the waterline the Watermark, as shown in the following figure.

Image

Obviously, as the stream processing engine keeps receiving messages, the watermark and waterline will also change accordingly. This process is similar to the “pouring milk, drinking milk” operation we just performed. Whenever the event time of a new message is greater than the current watermark, the system updates the watermark, pouring milk into the glass until it reaches the position of the maximum event time. Then, we use the straw to drink milk, removing the milk with depth T, causing the waterline to drop to the location of the Watermark.

Now that we have related different concepts, let’s formally introduce the Watermark mechanism in Structured Streaming. As mentioned earlier, the Watermark mechanism is used to determine which Late Data can participate in updating the state of previous windows and which Late Data should be discarded.

If we try to explain the Watermark mechanism with words, it is easy to confuse people. Therefore, let’s use a flowchart to illustrate the process.

Image

As you can see, when a new message arrives in the system, Structured Streaming first checks whether its event time is greater than the watermark. If the event time is greater than the watermark, the Watermark mechanism updates the watermark and waterline, which is the maximum event time and the Watermark, respectively.

On the contrary, suppose the event time of the new incoming message is below the current watermark. In that case, the system further compares the message’s event time with the “lower edge of the Watermark time window.” The so-called “lower edge of the Watermark time window” refers to the start time of the time window to which the Watermark belongs. Let’s take an example to illustrate. Suppose the Watermark is “2021-10-01 09:34:00”, and the event time window size is 5 minutes. Then, the time window where the Watermark is located is [“2021-10-01 09:30:00”, “2021-10-01 09:35:00”], which corresponds to window 30-35. At this point, the “lower edge of the Watermark time window” is the start time of the window 30-35, which is “2021-10-01 09:30:00”, as shown in the following figure.

Image

For the latest arriving message, if its event time is greater than the “lower edge of the Watermark time window”, the message can participate in the update of past windows’ state. Otherwise, the message will be discarded by the system and will not participate in the calculation. In other words, any message with an event time earlier than the “lower edge of the Watermark time window” is considered to be too late by the system and is not qualified to update the windows that have been calculated.

It can be seen that in this process, the tolerance for delay T is the determining factor in the Watermark mechanism. It determines the extent to which delayed messages can be tolerated and accepted by the system. So, how do users set this T? Furthermore, under the development framework of Structured Streaming, how does the Watermark mechanism take effect?

In fact, to enable the Watermark mechanism and set the tolerance T, we only need one line of code. Next, let’s take the “Word Count with Windowed Stream” example we just mentioned to demonstrate and explain the specific usage of the Watermark mechanism.

df = df.withColumn("inputs", split($"value", ","))
// Extract event time
.withColumn("eventTime", element_at(col("inputs"),1).cast("timestamp"))
// Extract word sequence
.withColumn("words", split(element_at(col("inputs"),2), " "))
// Split words
.withColumn("word", explode($"words"))
// Enable the Watermark mechanism and specify T as 10 minutes
.withWatermark("eventTime", "10 minute")
// Group by Tumbling Window and word
.groupBy(window(col("eventTime"), "5 minute"), col("word"))
// Perform counting
.count()

As you can see, except for the line “.withWatermark(“eventTime”, “10 minute”)”, the rest of the code is the same as the “Word Count with Windowed Stream” example. Here, we use the withWatermark function to enable the Watermark mechanism. The function has two parameters, the first one is the event time, and the second one is the tolerance T specified by the user.

To demonstrate the effect of the Watermark mechanism, let’s make some adjustments to the input data stream from netcat, as shown in the following table. Note that message 7 “Test Test” and message 8 “Spark is cool” are both considered as late data.

Image

Based on our analysis of the Watermark mechanism, with a tolerance T of 10 minutes, the late data message 8 “Spark is cool” will be accepted and consumed by the system, while the message 7 “Test Test” will be discarded. You can infer this conclusion on your own before looking at the following results demonstration.

Image

In the above image, the left side shows the output on the spark-shell when message 7 “Test Test” is input. As you can see, message 7 is discarded by the system and not included in the calculation. On the right side, you can see the result of message 8 “Spark is cool”. The words “Spark”, “is”, and “cool” have successfully updated the state of the previous window 30-35 (note that the count of “Spark” here is 3, not 1).

Key Review #

Alright, we have finished discussing today’s content. Let’s summarize it together. First of all, we need to know that when it comes to data processing, Structured Streaming can fully leverage the existing functionality and performance advantages of Spark SQL. Therefore, developers can easily use DataFrame operators or SQL statements to process streaming data.

Furthermore, we need to pay special attention to and master the Window operations and Watermark mechanism in Structured Streaming. Structured Streaming supports two types of windows, one is the “Non-overlapping Tumbling Window” and the other is the “Overlapping Sliding Window.” There is no superior or inferior between the two. As developers, we can use the window function and consider multiple parameters such as event time, window size, and window interval to flexibly choose between the two types of windows.

For the handling of late data, Structured Streaming uses the Watermark mechanism to determine whether it should participate in the computation and update of past windows. I have summarized the working principle of the Watermark mechanism in the following flowchart for you to refer to at any time.

Image

Practice for each lesson #

  1. Please modify the Tumbling Window code to a Sliding Window, based on the Tumbling Window code.
  2. Regarding the example in the Watermark mechanism, please analyze why message 8 “Spark is cool” is accepted and processed by the system, while message 7 “Test Test” is discarded?

Feel free to leave me a message in the comments section to discuss and exchange ideas. I also recommend sharing this lesson with more colleagues and friends.