30 Structured Streaming Starting With the Streaming Word Count

30 Structured Streaming - Starting with the Streaming Word Count #

Hello, I am Wu Lei.

Starting from today’s lesson, we will enter the module of stream computing. Unlike any other time, today’s big data processing places increasingly high demands on latency. Therefore, the basic concepts and working principles of stream processing are essential “skills” for every big data practitioner.

In this module, as usual, we will start with a hands-on example to give you a preliminary understanding of Spark’s stream processing framework, Structured Streaming. Then, we will delve into the working principles, best practices, and development considerations of Structured Streaming based on the capabilities and features provided by the framework.

In the first module of this column, we have been focused on Word Count, which means learning various data processing techniques through reading content from files in batch processing. However, in today’s lesson, we will take a different approach and start with a “streaming Word Count” to learn how Word Count is done in the framework of stream computing.

Environment Setup #

To get started with today’s exercise, you only need to have a local Spark environment and you don’t need a distributed physical cluster.

However, we need to provide input data to Spark in the form of a “stream”. Therefore, to complete today’s experiment, we need to open two command line terminals. One is used to start the spark-shell, and the other is used to open a socket port and input data, as shown in the following image.

Image

Image

Streaming Word Count #

After the environment is ready, let’s talk specifically about what is “Streaming Word Count”.

In the previous Word Count, the data was given to Spark as a file (wikiOfSpark.txt) all at once, which triggered a Job computation. In “Streaming Word Count”, the data is given to Spark in batches, with each line triggering a Job computation.

Specifically, we use the netcat tool to send data lines to the local 9999 port. The Spark streaming application listens to the local 9999 port and immediately triggers the execution of the computation logic once it receives a data entry. In our example, the computation logic is Word Count. After the computation is complete, the streaming application prints the result to the console.

Unlike batch processing, theoretically, the streaming application can run indefinitely as long as we don’t manually interrupt it. Take “Streaming Word Count” as an example, as long as we don’t force it to interrupt, it can continue to listen to the 9999 port, receive data from there, and process it in real-time.

Now that we understand what we need to do, let’s proceed step by step to implement it.

First, in the first terminal, enter the command “nc -lk 9999” to open the Socket address of the local 9999 port using the netcat tool. In general, most operating systems come pre-installed with the netcat tool, so you should be able to successfully execute the above command regardless of which operating system you are using.

Image

After entering the command, the cursor will continue to blink on the screen, indicating that the operating system is waiting for us to send data to the Socket address. Let’s set it aside for now and deal with it later after the streaming application is completed.

Next, in the second step, we enter the spark-shell local environment from the first terminal and start developing the streaming application. First, we import DataFrame and specify the host and port number that the application needs to listen to.

import org.apache.spark.sql.DataFrame

// Set the local address and port number to listen to
val host: String = "127.0.0.1"
val port: String = "9999"

Data Loading #

Next is the data loading step, where we create DataFrame using the readStream API of SparkSession.

// Create DataFrame from the listening address
var df: DataFrame = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .load()

If you carefully observe the code above, does it look familiar? That’s right, the readStream API is almost identical to the read API of SparkSession.

Image

As you can see, similar to the read API, the readStream API also consists of three basic elements:

  • format: specifies the type of data source for streaming processing
  • option: several options related to the data source
  • load: loads the data stream into Spark

In the streaming computing scenario, there are three important basic concepts that we need to understand. They are Source, Streaming Processing Engine, and Sink. Source is the data source of streaming computing, where data is continuously generated. In contrast, Sink refers to the destination where data flows, i.e., the place where data is directed. We will discuss Sink in detail later when we talk about the writeStream API.

The Streaming Processing Engine is the focus of learning in this module, and we will delve into it later. Its role is obvious: to perform data processing during the data flow process, ensuring data integrity and consistency. The data processing here includes various types of operations we have learned in the Spark SQL module, such as filtering, projecting, grouping, aggregating, and sorting, etc.

Now, let’s focus on the readStream API and Source. Through the format function of the readStream API, we can specify different types of data sources. In the Structured Streaming framework, Spark mainly supports three types of data sources: Socket, File, and Kafka.

Among them, the Socket type is mainly used for developing testing applications or testing connectivity. That’s why we use Socket as the data source in this lesson. File refers to the file system, where Spark can treat files flowing into a folder as a data stream. In actual industrial applications, the combination of Kafka + Spark is most common. Therefore, in the last part of this module, we will create a separate tutorial specifically discussing the best practices for integrating Kafka and Spark.

After specifying the data source using the format, we also need to use zero or more options to specify the specific address and access permissions of the data source. For example, in our code using Socket, we need to specify the host and port addresses.

// Create DataFrame from the listening address
var df: DataFrame = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .load()

Once everything is ready, we can use load to create DataFrame and continuously load the data stream into the Spark system.

Data Processing #

With DataFrame in hand, we can use various DataFrame operators we have learned before to implement the Word Count computation logic. This step is relatively simple, why not try it yourself first before continuing to read.

/**
  * Use DataFrame API to calculate Word Count
  */

// Firstly, split the received string into an array of words using space as the delimiter
df = df.withColumn("words", split($"value", " "))

// Flatten the array of words into individual words
.withColumn("word", explode($"words"))

// Group by word
.groupBy("word")
// Grouping and Counting
.count()

First of all, it should be noted that the DataFrame created from the socket by default only has a “value” column, which stores the data stream received from the socket on a row-by-row basis. For example, if we input two lines of data in the second terminal (netcat interface), namely “Apache Spark” and “Spark Logo”, then there will be two corresponding rows of data in the “value” column, which are “Apache Spark” and “Spark Logo”.

For the “value” column, we first split it into an array called “words” using spaces, and then flatten the “words” array into individual words using “explode”. Next, we perform a grouping and counting operation on the words. This part of the processing logic is relatively simple, and you can easily get started. I encourage you to try different operators to achieve the same logic.

Data Output #

After the data processing is completed, corresponding to the readStream API, we can use the writeStream API to write the processed results to a sink. In the Structured Streaming framework, Spark supports multiple sink types, including Console, File, Kafka, and Foreach(Batch). The differences and characteristics of these sinks will be discussed in the next lecture.

image

Now let’s talk about Console. Console is what we commonly refer to as the terminal. When we choose Console as the sink, Spark will print the results to the terminal. Therefore, Console is often used in conjunction with Socket for development experiments and testing connectivity. The code implementation is shown below.

/**
Write Word Count results to the Console
*/
df.writeStream
 // Specify the Sink as Console
.format("console")

// Specify output options
.option("truncate", false)

// Specify the output mode
.outputMode("complete")
//.outputMode("update")

// Start the streaming application
.start()
// Wait for termination command
.awaitTermination()

As you can see, the writeStream API looks very similar to the write API of DataFrame.

image

In the code, the format is used to specify the Sink type, while option is used to specify output options related to the Sink type. For example, the “truncate” option corresponds to Console and indicates whether the output content needs to be truncated. In the write API, we ultimately use save to store the data in the specified path. In the writeStream API, we use start to start end-to-end stream computing.

End-to-end stream computing refers to the three computing stages implemented in the “Streaming Word Count” application. That is, continuously loading data streams from the data source, processing the data with the Word Count logic, and finally printing the computing results to the Console. The entire calculation process continues uninterrupted. Even if there is no input from the netcat terminal, the “Streaming Word Count” application will continue to run until we force quit the application. This is exactly what the function awaitTermination does. As the name suggests, its purpose is to “wait for user interruption”.

In addition to the start and awaitTermination functions mentioned earlier, there is a difference between the writeStream API and the write API. If you are observant, you may have noticed that the writeStream API has an additional function called outputMode, which is used to specify the output mode of the data stream.

To understand this function, we need to know what output modes are supported by Structured Streaming. Let’s first talk about the output modes supported by Structured Streaming in general, and then use the execution results of the “Streaming Word Count” to make a visual comparison.

In general, Structured Streaming supports three types of sink output modes:

  • Complete mode: Outputs all processed content up to the current moment.
  • Append mode: Outputs only the most recent computation result.
  • Update mode: Outputs only the computation results with updated content.

Of course, these three modes are not always applicable in any scenario. For example, in our “Streaming Word Count” example, the Append mode is not applicable. The reason is that for stream processing with aggregation logic, developers must provide a watermark in order to use the Append mode.

In the 32nd lesson, we will continue to learn about watermarks and the three output modes of the sink. For now, it’s good to have a general impression.

Execution Results #

So far, the code for the “Streaming Word Count” application has been fully developed. Next, let’s run it to experience the charm of streaming computation. Then, we will replace “complete” in the outputMode with “update” to visually compare their characteristics and differences.

To run the “Streaming Word Count” application, the first step is to enter all the code we just implemented into the spark-shell of the first terminal one by one. After entering all the code, wait for a moment, and you should see the following screen:

Image

When the word “Batch: 0” appears, it means that our streaming processing application has been successfully executed and is waiting for the data stream to be entered at port 9999. Next, switch to the second terminal, which is the netcat terminal interface, and then enter the following text content, line by line (note! line by line!). Please wait 3 to 5 seconds between each data entry.

Image

Then, switch the screen back to the spark-shell terminal. You will see that Spark has executed 4 batches of jobs, and the execution results are as follows.

Image

As you can see, in Complete mode, each batch’s computation result contains all the data processed by the system up to that point. You can verify this by comparing the differences between each batch and the previous batches.

Next, in the spark-shell terminal, enter the interrupt command (ctrl + D or ctrl + C) to exit the spark-shell. Then enter the “spark-shell” command again in the terminal to reenter the local spark-shell environment, and enter the code for the “Streaming Word Count” again. However, this time, at the end of the code, we change the outputMode in writeStream from “complete” to “update”.

After entering the code, let’s switch back to the netcat terminal and enter the 4 data records again, then observe the execution results in the first terminal’s spark-shell interface.

Image

As can be seen from the comparison, in Update mode, each batch only outputs the data records that have changed. The so-called changes mean that either the word is first entered in this batch and the count is 1, or the word is repeatedly entered and the count has changed. You can verify this by observing the output of different batches and comparing the output results in Update and Complete modes.

Well, so far we have developed a streaming application called “Streaming Word Count” and together we have examined its computation results in different output modes. Congratulations! With what you have learned so far, you can say that one foot has stepped into the door of Spark streaming computation. There are still many exciting things ahead waiting for us to explore together. Let’s keep up the good work!

Key Points Review #

In today’s lecture, you need to grasp the following points. Firstly, you need to be familiar with three important concepts in the stream processing scenario, which are Source, stream processing engine, and Sink, as shown in the following figure.

Image

Furthermore, for Source and Sink, you need to know what specific support Spark can provide under the Structured Streaming framework. For example, for Source, Spark supports Socket, File, and Kafka, while for Sink, Spark supports Console, File, Kafka, and Foreach (Batch).

Then, we combine a stream processing application to familiarize ourselves with the general process of developing stream processing applications under the Structured Streaming framework. Generally, we use the readStream API to read data streams from different types of Sources and create DataFrames. We then use DataFrame operators to process the data, such as filtering, projecting, grouping, aggregating, etc. Finally, we use the writeStream API to write the processing results to different types of Sink.

Finally, for result output, we need to understand that Structured Streaming supports different output modes in different scenarios. There are three main output modes, which are Complete mode, Append mode, and Update mode. Among them, in Complete mode, all data processed up to now is output, while in Update mode, only the updated data in the current batch is output.

Practice for Each Lesson #

When running “Streaming Word Count”, we emphasize inputting the data content line by line in order. Please copy and paste the 4 lines of data provided in the example at once into netcat. Then, observe the results given by Structured Streaming and compare them to the previous results.

Feel free to interact with me in the comments section and I also encourage you to share today’s content with more colleagues and friends. Let’s build this Word Count streaming application together.