31 the New Generation of Stream Processing Frameworks Which Is Stronger, Batch Mode or Continuous Mode?

31 The New Generation of Stream Processing Frameworks - Which is Stronger, Batch mode or Continuous mode? #

Hello, I’m Wu Lei.

In the previous lecture, we got to know Structured Streaming through the “Streaming Word Count” example and learned about the three essential elements of stream processing development: Source, stream processing engine, and Sink.

Image

In today’s lecture, let’s focus on Structured Streaming, which is the stream processing engine itself. Structured Streaming is an important sub-framework of Spark, alongside Spark MLlib. It’s worth mentioning that Structured Streaming naturally benefits from the processing power and performance provided by Spark SQL and seamlessly integrates with other sub-frameworks. Therefore, stream processing applications developed based on this new generation framework, Structured Streaming, have excellent execution performance and scalability.

Knowing oneself and the enemy leads to victory in every battle. To flexibly meet different real-time computing requirements, we need to first understand what the computation model of Structured Streaming looks like and how it handles fault tolerance and maintains data consistency. Let’s start with the computation model.

Computing Model #

When data flows into the Structured Streaming engine like a continuous stream, the engine does not consume and process the data automatically in order. It requires a mechanism called Trigger to trigger the computation of the data in the engine.

In other words, the Trigger mechanism determines when, how, and at what frequency the engine processes the received data stream. Structured Streaming supports four types of Triggers, as shown in the table below.

Image

To set a Trigger for streaming processing, we only need to call the trigger function based on the writeStream API. With so many types of Triggers, it can be overwhelming to dive into the details all at once. For now, you only need to know that Structured Streaming supports a variety of Triggers.

Let’s focus on the computing model. For streaming data, Structured Streaming supports two computing models: Batch mode and Continuous mode. The essence of the computing model is how Spark treats and processes streaming data.

What does this mean? Without comparison, there is no way to distinguish. Let’s explain the meaning of the computing model by comparing Batch mode and Continuous mode.

Batch mode #

Let’s talk about Batch mode first. Batch mode refers to Spark dividing the continuous data stream into discrete micro-batches, which are small portions of data sets.

To put it more vividly, Batch mode is like “cutting water with a knife”. The amount of water between two cuts is a micro-batch. Each micro-batch triggers a Spark Job, and each Job contains several Tasks. After learning the basics and the Spark SQL module, we know that these Tasks will eventually be optimized and executed by Spark SQL and Spark Core.

Image

In this computing model, different types of Triggers, such as Default, Fixed interval, and One-time, are just different ways to control the granularity of micro-batch cutting.

For example, in the Default Trigger, Spark decides the cutting granularity based on the rate at which data flows in, without the need for developers to worry about it. However, if developers want to intervene in the cutting granularity, they can use the Fixed interval Trigger to explicitly define the time period for micro-batch cutting. For example, Trigger.ProcessingTime(“5 seconds”) means cutting a micro-batch every 5 seconds.

Continuous mode #

Unlike Batch mode, Continuous mode does not divide the data stream but processes the data continuously at the granularity of events/messages. Here, events or messages refer to the finest-grained form of data in the original data stream, which can be a single word, a line of text, or a frame of an image.

Taking “Streaming Word Count” as an example, the events/messages in the Source are individual English words. You might have a question at this point: “In Batch mode, isn’t Structured Streaming also continuously creating micro-batches? The data is still complete and not lost. What is the fundamental difference between Continuous mode and Batch mode?”

Image

A picture is worth a thousand words. By comparing the schematic diagrams of the two computing models, we can easily find the differences between them. In Continuous mode, Structured Streaming uses a long-running job to process each message in the data stream (or service).

Now, the question is, compared to triggering a job for each micro-batch, what are the special benefits of Continuous mode, where a long-running job is used for processing? In other words, what are the advantages and disadvantages of these two different computing models?

In summary, Batch mode has higher throughput and higher latency (in seconds), while Continuous mode has lower throughput and lower latency (in milliseconds). Throughput refers to the number of messages processed by the engine per unit of time. Batch data can better leverage the advantages of the Spark distributed computing engine, so Batch mode naturally has a higher throughput.

To answer why Continuous mode performs better in terms of latency, we need to start with the fault tolerance mechanism of Structured Streaming.

Fault Tolerance Mechanism #

Fault tolerance is an essential capability for any stream processing engine. Fault tolerance refers to the ability of the stream processing engine to recover from errors (at the job or task level, for example) during the computation process while ensuring data integrity, i.e., consistent data processing.

From the perspective of data consistency, this fault tolerance capability can be divided into three levels:

  • At most once: data can be delivered at most once, with the risk of data loss;
  • At least once: data can be delivered at least once, with the possibility of duplication;
  • Exactly once: data can be delivered and processed exactly once, without duplication or omission.

Image

Here, “delivery” refers to the entire process of data being processed from the source to the sink. For the same data, it may be processed by the engine once or multiple times (in the event of job or task failure), but depending on the fault tolerance capability, the final computation results may be delivered to the sink zero times, once, or multiple times.

Now that we’ve discussed the basic concept of fault tolerance, let’s talk about Structured Streaming. Regarding its fault tolerance capability, the official Spark community states, “By combining an idempotent sink, Structured Streaming can provide exactly-once fault tolerance.”

In reality, this statement can be divided into two parts. In terms of data processing, with the help of fault tolerance mechanism, Structured Streaming itself can provide “at least once” processing capability. By combining an idempotent sink, Structured Streaming can achieve end-to-end “exactly once” fault tolerance.

For example, widely used sources like Kafka provide idempotency at the producer level, which ensures that messages can be delivered without duplication and omissions when combined with Kafka as a sink.

However, in the fault tolerance mechanism of Structured Streaming itself, in order to achieve “at least once” data processing, different implementation approaches are adopted for batch mode and continuous mode, which are two different computation models. The different fault tolerance implementations are also one of the important factors that lead to the significant difference in latency between these two computation models.

Next, let’s discuss how batch mode and continuous mode achieve fault tolerance.

Fault Tolerance in Batch Mode #

In batch mode, Structured Streaming uses the checkpointing mechanism to achieve fault tolerance. Before processing the actual data stream in a micro-batch, the checkpointing mechanism stores all the metadata of that micro-batch to the developer-specified file system path, such as HDFS or Amazon S3. In this way, when a job or task failure occurs, the engine only needs to read the pre-recorded metadata to resume the data stream from the point of failure.

To specify the checkpoint directory, you only need to configure the checkpointLocation option in the writeStream API’s option. Using the “Streaming Word Count” example from the previous lesson, you can make the following modification to the code:

df.writeStream
// Specify the sink as the console
.format("console")

// Specify output options
.option("truncate", false)

// Specify the checkpoint storage location
.option("checkpointLocation", "path/to/HDFS")

// Specify the output mode
.outputMode("complete")
//.outputMode("update")

// Start the streaming application
.start()
// Wait for termination command
.awaitTermination()

Under the Checkpoint storage directory, there are several subdirectories: offsets, sources, commits, and state. They store the metadata logs for each micro-batch. The actual content recorded in each subdirectory is illustrated below for your reference.

- For each micro-batch, before it is actually processed by the Structured Streaming engine, the Checkpoint mechanism will first log its metadata to a file. Therefore, these log files are also known as Write Ahead Logs (WAL logs).

In other words, after the source data flows into the source, it needs to report to the Checkpoint directory before it can be processed by the Structured Streaming engine. Without a doubt, this “reporting” step delays the end-to-end processing latency, as shown in the diagram below.

Image

In addition to that, since each micro-batch triggers a Spark job, we know that frequent scheduling of jobs and tasks introduces computational overhead, resulting in varying degrees of latency. With the combination of running mode and fault-tolerant mechanism, the latency level of batch mode usually stays at the order of seconds and can reach several hundred milliseconds in the best case scenario.

Fault tolerance in Continuous mode #

Compared to batch mode, fault tolerance in continuous mode is not as complicated. In continuous mode, Structured Streaming utilizes the Epoch Marker mechanism to achieve fault tolerance.

Since continuous mode does not involve micro-batching by nature, it does not have the delays associated with micro-batching. Messages that reach the source in continuous mode can be consumed and processed by the Structured Streaming engine immediately. However, this poses a problem - how does the engine persist the current processing progress in order to facilitate failure recovery?

To address this problem, Spark introduced the Epoch Marker mechanism. An Epoch Marker can be understood as a “cursor” in a flowing river. These “cursors” move along with the flow of data. Each cursor represents an Epoch, and the amount of data between two cursors is considered an Epoch. Developers can specify the Epoch interval with the following statement.

writeStream.trigger(continuous = "1 second")

Taking the code in the table as an example, for the data stream in the source, Structured Streaming inserts an Epoch Marker every 1 second, and the data between two Epoch Markers is regarded as one Epoch. You may ask, “I understand the concept of an Epoch Marker, but what is it used for?”

During the process of handling and delivering data, whenever an Epoch Marker is encountered, the engine writes the Offset of the last message in the corresponding Epoch to the log, achieving fault tolerance. It should be noted that writing to the log is asynchronous, so this process does not introduce latency to data processing.

Interestingly, the log is often referred to as the Write Ahead Log on the internet. However, I think this name may not be appropriate because the messages being prepared for logging have already been consumed and processed by the engine. In batch mode, the log is written before processing the data, whereas in continuous mode, it is the other way around - the data is processed first, and then the log is written. Therefore, the term “Write After Log” may be more suitable for the log in continuous mode.

Let’s use a comparison method to deepen our understanding. Next, we will use a timeline that shows the arrival of messages from the source to the Structured Streaming engine to illustrate the processing delay in continuous mode.

Image

As can be seen, messages can be consumed and processed by the Structured Streaming engine immediately after they are generated in the source. Therefore, in terms of latency, better guarantees can be achieved. The Epoch Markers help the engine identify the latest processed message, allowing the corresponding Offset to be recorded in the log for failure recovery.

Key Takeaways #

Up to this point, we have covered all the content for today. Let’s summarize together.

In today’s lecture, we learned about two different computation models in Structured Streaming: Batch mode and Continuous mode. It is important to understand the characteristics of each mode, such as throughput, latency, and fault-tolerance, in order to make better choices when facing different streaming computing scenarios in our daily work.

In Batch mode, Structured Streaming divides the data stream into micro-batches. For each micro-batch, the engine creates a corresponding job and delivers it to Spark SQL and Spark Core for optimization and execution.

The characteristic of Batch mode is high throughput, but the end-to-end latency is relatively high as well, usually in the range of seconds. The high latency of Batch mode comes from both the job scheduling itself and its fault-tolerance mechanism, which requires pre-writing WAL (Write Ahead Log) logs in the Checkpoint mechanism.

To achieve lower processing latency, you can use the Continuous mode computation model in Structured Streaming. In Continuous mode, the engine creates a long-running job to consume and serve all messages from the source.

In this case, the Continuous mode naturally avoids the computational overhead introduced by frequent job generation and scheduling. At the same time, by using Epoch Marker and processing data before logging, Continuous mode further eliminates the latency impact caused by fault-tolerance.

Both Batch mode and Continuous mode have their advantages and limitations. Batch mode excels in throughput, while Continuous mode achieves millisecond-level latency.

However, it is worth noting that currently, in Continuous mode, Structured Streaming only supports non-aggregation operations, such as map, filter, flatMap, and so on. Aggregation operations, such as grouping and counting in “Streaming Word Count”, are not supported in Continuous mode for now. This limitation may restrict the application scope of Continuous mode, so please pay special attention to it.

Practice for Every Lesson #

Batch mode uses pre-written WAL logs for fault tolerance. Can you think of a way to refer to the approach used in Continuous mode, where data is processed first and then logged, and move the logging action in Batch mode to after data consumption and processing?

Feel free to discuss and exchange ideas with me in the comment section. I also encourage you to share the content of this lesson with more friends.