34 Spark and Kafka in Stream Computing the Panacea

34 Spark and Kafka in Stream Computing - The Panacea #

Hello, I am Wu Lei.

In the previous lectures, we have mentioned multiple times that Kafka is one of the most important sources for Structured Streaming when it comes to data sources. In industrial-grade production systems, the combination of Kafka and Spark is very common. Therefore, it is crucial for students who want to work in the field of stream computing to master the integration of Kafka and Spark.

In today’s lecture, we will discuss how to use the versatile combination of Spark and Kafka with examples. As business rapidly develops, the scale of clusters in various companies is constantly growing. With the significant increase in cluster scale, resource utilization has become a focal point of attention. After all, whether it is a self-built data center or a public cloud, each machine represents a substantial investment.

Example: Real-time Calculation of Resource Utilization #

In today’s example, we are going to talk about the real-time calculation of resource utilization. Specifically, we first need to collect the resource utilization (CPU, memory) of each machine in the cluster and write it to Kafka. Then, we use Spark’s Structured Streaming to consume the Kafka data stream and perform preliminary analysis and aggregation on the resource utilization data. Finally, through Structured Streaming, we print the aggregated results to the console and write them back to Kafka, as shown in the following diagram.

Image

Generally speaking, in industrial-grade applications, each rounded rectangle in the above diagram is deployed independently. The green rectangle represents the server cluster to be monitored, the blue rectangle represents the independently deployed Kafka cluster, and the red Spark cluster is also deployed independently. Independent deployment means that the clusters do not share machine resources, as shown in the following diagram.

Image

If you don’t have such a deployment environment at hand, don’t worry. To complete the example of real-time calculation of resource utilization, you don’t have to rely on independently deployed distributed clusters. In fact, you can reproduce today’s example in a single machine environment.

Course Schedule #

Today’s lecture involves a lot of content. Before we officially start the course, let’s first outline the course content so that you have a clear understanding of it.

image

Regarding the four steps shown in the figure above, we will combine them with code implementation and explain the following four steps respectively:

  1. Generate CPU and memory consumption data streams and write them into Kafka;
  2. Structured Streaming consumes Kafka data and performs preliminary aggregation;
  3. Structured Streaming prints the calculation results to the terminal;
  4. Structured Streaming writes the calculation results back to Kafka for future use.

In addition, in order to accommodate students who are not familiar with Kafka, we will also provide a brief overview of Kafka installation, topic creation and consumption, as well as basic Kafka concepts.

Read quickly about Kafka’s architecture and operation mechanism #

Before we dive into the computational aspect mentioned earlier, let’s understand the core functionalities provided by Kafka.

In the big data streaming ecosystem, Kafka is the most widely used messaging middleware. The core functionalities of messaging middleware are as follows:

  1. Connect message producers and message consumers.
  2. Cache messages (or events) produced by producers.
  3. Enable consumers to access messages with minimal latency.

The term “message producer” refers to the source and channel of events or messages. In our example, the monitored cluster is the producer. The machines in the cluster continuously produce resource utilization messages. On the other hand, the term “message consumer” refers to the system that accesses and processes messages. In this lecture’s example, the consumer is Spark. Structured Streaming reads and processes resource utilization messages from Kafka, performing aggregation and summarization.

From the analysis so far, we can see that the existence of messaging middleware brings three benefits to the interaction between producers and consumers:

  • Decoupling: Both sides do not need to be aware of each other’s existence, except for the messages themselves.
  • Asynchrony: Both sides can produce or consume messages at their own “pace” and “rhythm” without being limited by each other’s processing capacity.
  • Load balancing: When a consumer subscribes to messages from multiple producers and multiple producers simultaneously generate a large number of messages, asynchronous mode allows the consumer to flexibly consume and process messages, thereby avoiding the risk of overloading computational resources.

Now that we understand Kafka’s core functions and features, let’s talk about Kafka’s system architecture. Unlike most master-slave architecture big data components (such as HDFS, YARN, Spark, Presto, Flink, etc.), Kafka follows a leaderless architecture. This means that in a Kafka cluster, there is no master role to maintain the global data state.

Each server in the cluster is called a Kafka broker. The broker’s responsibility is to store messages produced by producers and provide data access for consumers. Brokers are independent of each other and have no dependencies between them.

Instead of simply explaining Kafka’s architecture in a straightforward manner, which may make you fall asleep, let’s use a diagram to provide a more intuitive explanation of key concepts in Kafka.

Image

As mentioned earlier, Kafka relies on ZooKeeper to store and maintain global metadata. Metadata refers to the distribution and status of messages in the Kafka cluster. Logically, messages belong to one or more topics. In the diagram above, all the blue rounded rectangles represent messages belonging to Topic A, while the green rounded rectangles belong to Topic B.

In our resource utilization example, we will create two topics: “cpu-monitor” for CPU utilization and “mem-monitor” for memory utilization. When a producer writes a message to Kafka, it needs to specify which topic the message belongs to. For example, monitoring data related to CPU should be sent to “cpu-monitor,” while memory monitoring data should be sent to “mem-monitor.”

To balance the workload across different brokers, messages in the same topic are stored at the partition level on physical disks. The rounded rectangles in the diagram represent individual data partitions. In Kafka, a partition is actually a directory on the disk, and messages are stored sequentially in files within the partition directory.

To provide high availability (HA) for data access, after a producer writes a message to the leader partition, Kafka synchronizes the message to multiple partition replicas (followers). Steps 1 and 2 in the diagram demonstrate this process.

By default, consumers pull and consume data from the leader partition, as shown in step 3. When the leader partition fails and the data becomes unavailable, Kafka selects a new leader from the remaining partition replicas to serve external requests. This process is known as “leader election.”

That’s it! We have covered the basic functionalities and operation mechanism of Kafka. Although these introductions may not cover the full scope of Kafka, they are sufficient for beginners to move on to practical implementation and integration with Spark and Kafka.

Integration of Kafka and Spark #

Next, let’s walk you through the process of integrating Kafka and Spark step by step using the example of “real-time resource utilization calculation”. First, in the first step, let’s prepare the Kafka environment.

Kafka Environment Preparation #

To configure the Kafka environment, we only need three simple steps:

  1. Install ZooKeeper, install Kafka, and start ZooKeeper;
  2. Modify the Kafka configuration file server.properties and set the ZooKeeper-related configuration items;
  3. Start Kafka and create topics.

First, let’s download the installation packages for ZooKeeper and Kafka from the ZooKeeper official website and Kafka official website respectively. Then, unzip and install the packages, and configure the relevant environment variables as shown in the table below:

// Download the ZooKeeper installation package
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
// Download the Kafka installation package
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz

// Extract and install ZooKeeper to the specified directory
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/zookeeper
// Extract and install Kafka to the specified directory
tar -zxvf kafka_2.12-2.8.0.tgz -C /opt/kafka

// Edit the environment variables
vi ~/.bash_profile
/** Enter the following content into the file
export ZOOKEEPER_HOME=/opt/zookeeper/apache-zookeeper-3.7.0-bin
export KAFKA_HOME=/opt/kafka/kafka_2.12-2.8.0
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
*/

// Start ZooKeeper
zkServer.sh start

Next, we open the server.properties file in the Kafka configuration directory (i.e., $KAFKA_HOME/config), and set the zookeeper.connect configuration item to “hostname:2181”, which is the hostname plus the port number.

If you have installed ZooKeeper and Kafka on the same node, the hostname can be set to localhost. If it is a distributed deployment, the hostname should be the installation node where ZooKeeper is located. Generally, ZooKeeper uses port 2181 to provide services by default, so we can use the default port here.

After the configuration file is set, we can use the following command to start Kafka Broker on multiple nodes:

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

After Kafka is started, let’s create the two topics we mentioned earlier: cpu-monitor and mem-monitor, which are used to store CPU utilization messages and memory utilization messages, respectively.

kafka-topics.sh --zookeeper hostname:2181/kafka --create
--topic cpu-monitor
--replication-factor 3
--partitions 1

kafka-topics.sh --zookeeper hostname:2181/kafka --create
--topic mem-monitor
--replication-factor 3
--partitions 1

How is it? Isn’t it easy? To create a topic, you only need to specify the ZooKeeper service address, topic name, and replica count. However, here, it is important to note that the replication factor cannot exceed the number of Brokers in the cluster. So, if you are deploying locally, that is, all services are deployed on one node, then the replication factor should be set to 1.

Well, that’s it for installing and configuring the Kafka environment. Next, we should let the producer produce resource utilization messages and continuously inject them into the Kafka cluster.

Message Production #

In our example, what we need to do is monitor the resource utilization of each machine in the cluster. Therefore, we need these machines to periodically send out CPU and memory utilization rates. To achieve this, we only need to complete the following two necessary steps:

  1. Each node collects CPU and memory usage data from itself;
  2. Send the collected data to the Kafka cluster at a fixed interval.

As the code for this part is relatively long and our focus is on learning the integration of Kafka and Spark, here we only provide key code fragments involved in these two steps. You can download the complete code implementation from here.

import java.lang.management.ManagementFactory
import java.lang.reflect.Modifier

def getUsage(methodName: String): Any = {
    // Get the OperatingSystem Java Bean
    val operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
    // Get the declared methods in the OperatingSystem object
    for (method <- operatingSystemMXBean.getClass.getDeclaredMethods) {
        method.setAccessible(true)
        // Check if it is the desired method name
        if (method.getName.startsWith(methodName) && Modifier.isPublic(method.getModifiers)) {
            // Invoke and execute the method to obtain the utilization rate of the specified resource (CPU or memory)
            return method.invoke(operatingSystemMXBean)
        }
    }
    throw new Exception(s"Can not reflect method: ${methodName}")
}

// Get the CPU utilization rate
def getCPUUsage(): String = {
    var usage = 0.0
    try {
        // Call the getUsage method with the "getSystemCpuLoad" parameter to get the CPU utilization rate
        usage = getUsage("getSystemCpuLoad").asInstanceOf[Double] * 100
    } catch {
        case e: Exception => throw e
    }
    usage.toString
}

// Get the memory utilization rate
def getMemoryUsage(): String = {
    var freeMemory = 0L
    var totalMemory = 0L
    var usage = 0.0
    try {
        // Call the getUsage method with the relevant memory parameters to get the memory utilization rate
        freeMemory = getUsage("getFreePhysicalMemorySize").asInstanceOf[Long]
        totalMemory = getUsage("getTotalPhysicalMemorySize").asInstanceOf[Long]
        // Calculate the current memory usage by subtracting the free memory from the total memory
        usage = (totalMemory - freeMemory.toDouble) / totalMemory * 100
    } catch {
        case e: Exception => throw e
    }
    usage.toString
}

Using Java’s reflection mechanism to obtain resource utilization

In the above code, the CPU and memory utilization rates are obtained. The most critical part of this code is to use Java’s reflection mechanism to obtain the various public methods of the operating system object, and then call these public methods to obtain the resource utilization rates.

However, you may say, “I don’t understand Java’s reflection mechanism, and I don’t understand the above code.” That’s okay. As long as you can understand the calculation logic of the above code in combination with the comments, it will be fine. After obtaining the data for the resource utilization rates, we can send them to Kafka.

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

// Initialize the properties
def initConfig(clientID: String): Properties = {
    val props = new Properties
    // Set the necessary properties
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server1:9092,kafka-server2:9092,kafka-server3:9092")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID)
    // Return the properties
    props
}

// Create a Kafka producer and send messages
def produceMessages(): Unit = {
    // Create a Kafka producer
    val producer = new KafkaProducer[String, String](initConfig("client-1"))

    try {
        // Get the CPU and memory utilization rates
        val cpuUsage = getCPUUsage()
        val memoryUsage = getMemoryUsage()

        // Create a ProducerRecord and send the messages
        val record = new ProducerRecord[String, String]("cpu-monitor", "cpu-usage", cpuUsage)
        producer.send(record)

        val record = new ProducerRecord[String, String]("mem-monitor", "memory-usage", memoryUsage)
        producer.send(record)
    } finally {
        // Close the Kafka producer
        producer.close()
    }
}

In the above code, we initialize the Kafka producer’s properties and create a producer with the specified client ID. Then, we call the previous getCPUUsage() and getMemoryUsage() methods to obtain the CPU and memory utilization rates, and finally create ProducerRecord objects to send these messages to the Kafka cluster.

These are the key code fragments involved in the two necessary steps. The complete implementation of the code can be downloaded from here.

Please note that the code provided here is just for illustration purposes, and you may need to modify it according to your specific needs.

val brokerList = "localhost:9092"
// Specify the Kafka cluster broker list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID)
props
}

val clientID = "usage.monitor.client"
val cpuTopic = "cpu-monitor"
val memTopic = "mem-monitor"

// Define properties, including Kafka cluster information, serialization method, etc.
val props = initConfig(clientID)
// Define a Kafka Producer object for sending messages
val producer = new KafkaProducer[String, String](props)
// Callback function, can be ignored temporarily
val usageCallback = _

while (true) {
  var cpuUsage = new String
  var memoryUsage = new String
  // Call the previously defined functions to get CPU and memory usage
  cpuUsage = getCPUUsage()
  memoryUsage = getMemoryUsage()

  // Generate Kafka messages for the CPU topic
  val cpuRecord = new ProducerRecord[String, String](cpuTopic, clientID, cpuUsage)
  // Generate Kafka messages for the memory topic
  val memRecord = new ProducerRecord[String, String](memTopic, clientID, memoryUsage)
  // Send CPU usage messages to the Kafka cluster
  producer.send(cpuRecord, usageCallback)
  // Send memory usage messages to the Kafka cluster
  producer.send(memRecord, usageCallback)
  // Set the sending interval to 2 seconds
  Thread.sleep(2000)
}

From the above code, we can see that there are three key steps:

  • Define Kafka Producer object, where we need to specify Kafka cluster information in the properties.
  • Call the previously defined functions getCPUUsage and getMemoryUsage to get CPU and memory resource usage.
  • Package resource usage as messages and send them to the corresponding topics.
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._

dfCPU.writeStream
.outputMode("Complete")
// Use Console as the Sink
.format("console")
// Trigger a Micro-batch every 10 seconds
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()

Using the above code, we can directly observe the Kafka messages obtained by Structured Streaming through the terminal, thus establishing a perceptual understanding of the urgently processed messages, as shown in the following figure.

Image

In the above data, besides the Key and Value, all other information is the metadata of the message, i.e., the Topic to which the message belongs, the partition it is in, the offset address of the message, and the time it is recorded in Kafka, etc.

In our example, the Key corresponds to the server node that sends the resource utilization rate data, while the Value is the specific CPU or memory utilization rate. After familiarizing ourselves with the schema and composition of the messages, we can proceed to process these real-time data streams in a targeted manner.

For the resource utilization rate data generated every two seconds, suppose we only care about their average value within a certain period of time (such as 10 seconds). In this case, we can use the Trigger and aggregation calculation to achieve this. The code is as follows.

import org.apache.spark.sql.types.StringType

dfCPU
.withColumn("clientName", $"key".cast(StringType))
.withColumn("cpuUsage", $"value".cast(StringType))
// Group by server
.groupBy($"clientName")
// Calculate the average value
.agg(avg($"cpuUsage").cast(StringType).alias("avgCPUUsage"))
.writeStream
.outputMode("Complete")
// Use Console as the Sink
.format("console")
// Trigger a Micro-batch every 10 seconds
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()

As shown above, we use the Fixed interval trigger to create a Micro-batch every 10 seconds. Then, in a Micro-batch, we group by the server that sent the message and calculate the average CPU utilization rate. Finally, we print the statistical results to the terminal, as shown in the following figure.

Image

Writing back to Kafka #

In fact, in addition to printing the results to the terminal, we can also write them back to Kafka. We know that Structured Streaming supports a wide variety of sinks, including File, Kafka, Console (used for testing), and Foreach (Batch), etc. Writing data back to Kafka is not difficult. We just need to specify the format as Kafka and set the corresponding options in the writeStream API, as shown in the table below.

dfCPU
.withColumn("key", $"key".cast(StringType))
.withColumn("value", $"value".cast(StringType))
.groupBy($"key")
.agg(avg($"value").cast(StringType).alias("value"))
.writeStream
.outputMode("Complete")
// Specify Sink as Kafka
.format("kafka")
// Set Kafka cluster information, in this example there is only one Kafka Broker: localhost
.option("kafka.bootstrap.servers", "localhost:9092")
// Specify the Kafka Topic to be written to, the topic "cpu-monitor-agg-result" needs to be created in advance
.option("topic", "cpu-monitor-agg-result")
// Specify the WAL Checkpoint directory address
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()

First, we specify the Sink as Kafka, and then use the option parameter to set the Kafka cluster information, the name of the topic to be written to, and the WAL Checkpoint directory. When typing the above code in spark-shell, Structured Streaming will fetch the original utilization rate information (Topic: cpu-monitor) from Kafka every 10 seconds and then perform group by and aggregation on the server. Finally, it will write the aggregated results back to Kafka (Topic: cpu-monitor-agg-result).

There are two points to note here. First, the topics for reading and writing should be different to avoid confusion in logic and data. Second, as you may have noticed, the data written back to Kafka must use the “key” and “value” fields in the schema, instead of being able to flexibly define field names like “clientName” and “avgCPUUsage”, so pay special attention to this.

Key Review #

Alright, up to this point, I have walked you through the integration of Kafka and Spark, completing every step involved in the diagram, which includes producing messages, writing to Kafka, consuming and processing messages, and finally writing back to Kafka.

图片

Today’s content is quite extensive. In addition to mastering each step and usage in the integration, you also need to understand some basic concepts and features of Kafka. Kafka is the most widely used messaging middleware, and it has three core functionalities:

  1. Connects message producers and consumers.
  2. Buffers the messages (or events) produced by producers.
  3. Enables consumers to access messages with the lowest possible latency.

You don’t need to memorize Kafka’s basic concepts. When needed, you can refer back to the architectural diagram. In this diagram, Kafka’s basic concepts are clearly marked, as well as the simple process of message production, buffering, and consumption.

图片

As for the integration of Kafka and Spark, whether it’s using Structured Streaming to consume Kafka messages through the readStream API or using the writeStream API to write computation results to Kafka, you just need to remember the following points to easily set up this versatile combination:

  • Specify Kafka as the source or sink in the format function.
  • In the options, specify the Kafka cluster broker for the kafka.bootstrap.servers key-value.
  • In the options, set the subscribe or topic to specify the Kafka topic for reading or writing.

Practice for Each Lesson #

Based on the CPU utilization code in this lesson, please complete each step shown in the diagram for memory utilization, including the production and writing of memory utilization messages to Kafka (Step 1), the consumption and calculation of messages (Steps 2 and 3), and the further writing of aggregated results to Kafka (Step 4).

Image

Feel free to forward today’s lesson to more colleagues and friends, and try out the Spark + Kafka example together. I’ll be waiting for your sharing in the comments section.