41 Kafka Streams DSL Development Examples

41 Kafka Streams DSL Development Examples #

Hello, I’m Hu Xi. Today I want to share with you a topic: Kafka Streams DSL Development Example.

DSL, which stands for Domain Specific Language, refers to a language specific to a particular domain. It provides a set of convenient APIs to help us implement stream data processing logic. Today, I will share some Kafka Streams DSL development methods and specific examples.

Introduction to Kafka Streams #

In the previous lecture, we mentioned that a stream processing platform is an engine dedicated to handling infinite datasets. As for Kafka Streams, it is merely a client library. The so-called Kafka Streams application is just a regular Java application that calls the Streams API. However, in Kafka Streams, the stream processing logic is represented using a topology.

A topology structure is essentially a directed acyclic graph (DAG), which consists of multiple processing nodes and edges that connect these nodes, as shown in the diagram below:

The nodes in the diagram are also referred to as processing units or processors, which encapsulate specific event processing logic. Processors are also known as operators in other stream processing platforms. Common operators include transformations (map), filters (filter), joins (join), and aggregations (aggregation), among others. Later, I will provide a detailed introduction on several commonly used operators.

Generally, Kafka Streams offers two main types of APIs to define processor logic.

The first type is the DSL I just mentioned, which is a declarative functional API that feels similar to SQL. You don’t have to worry about how it is implemented underneath; you just need to call specific APIs to tell Kafka Streams what you want to do.

Let’s take a look at a simple example to help you understand what the following code does.

movies.filter((title, movie) -> movie.getGenre().equals("Action")).xxx()...

Although this code uses Java 8’s Lambda expression, the overall purpose of it should be clear: it filters out events where the movie genre is “Action” from all Movie events. This is how the DSL declarative API is implemented.

The second type is the imperative low-level API, known as the Processor API. Compared to the DSL, this set of APIs provides more flexibility in terms of implementation. You can write custom operators to implement some processing logic that the DSL naturally does not provide. In fact, the DSL is implemented using the Processor API.

Currently, the Kafka Streams DSL provides a rich set of APIs, which can basically meet most of our processing logic requirements. Today, I will focus on introducing the usage of the DSL.

Regardless of which set of APIs you use, all stream processing applications can essentially be classified into two categories: stateful applications and stateless applications.

A stateful application refers to an application that uses APIs similar to join, aggregation, or time window (Window). Once these APIs are called, your application becomes stateful, which means you need Kafka Streams to help you store the state of your application.

A stateless application refers to an application where the processing result of a message does not affect or depend on the processing of other messages. Common stateless operations include event transformations and filtering, as demonstrated in the example above.

Key Concepts #

After learning about the background, you also need to understand some key concepts in the field of stream processing, including streams, tables, the duality of streams and tables, and time and time windows.

Duality of Streams and Tables #

Firstly, let me introduce the concepts of streams and tables in stream processing, as well as their relationship.

A stream is an endlessly flowing (at least theoretically) sequence of events, while a table, similar to the concept in a relational database, is a set of rows and records. In the field of stream processing, these two concepts are organically unified: streams are aggregated over time to form tables, while tables are continuously updated over time to form streams, which is known as the duality of streams and tables. The duality of streams and tables is one of the key reasons for the success of the Kafka framework in the field of stream processing.

The following diagram illustrates the process of transforming a table into a stream and vice versa.

Initially, the table contains only one record, “张三:1” (“Zhang San: 1”). When this record is converted into a stream, it becomes an event. Then, a new record, “李四:1” (“Li Si: 1”), is added to the table. As a result, the stream also adds a corresponding new event. Afterwards, the value corresponding to Zhang San in the table is updated from 1 to 2, and the stream also adds an corresponding update event. Finally, a new record, “王五:1” (“Wang Wu: 1”), is added to the table, and the stream also adds a new record. This completes the process of transforming the table into a stream.

From this process, we can see that a stream can be seen as the changelog of a table, i.e., the change events of a table. Conversely, the process of transforming a stream into a table can be considered as the inverse of this process: we take a snapshot of each event in the stream to form a table.

The concepts of streams and tables are crucial in the field of stream processing. In the Kafka Streams DSL, streams are represented by KStream, while tables are represented by KTable.

Kafka Streams also defines a GlobalKTable. Essentially, it, like a KTable, represents a table encapsulating the change events. However, the most significant difference between GlobalKTable and KTable is that when a Streams application reads data from a Kafka topic into a GlobalKTable, it reads data from all partitions of the topic, while for KTable, a Streams application instance only reads data from a subset of partitions, depending on the number of Streams instances.

Time #

In the field of stream processing, it is critical to precisely define event time: on one hand, it is a prerequisite for the correctness of a stream processing application; on the other hand, operations such as time windows in stream processing rely on the concept of time to work properly.

Common time concepts fall into two categories: event time and processing time. Ideally, we hope that these two times are equal, which means events are processed immediately once they occur. However, in practical scenarios, this is not possible. Processing time always lags behind event time, and the degree of lag is highly variable and unpredictable, as shown in the figure below from the book “Streaming Systems”:

The dashed line in the figure represents the ideal state, where event time equals processing time. The pink curve represents the actual situation, where processing time lags behind event time, and the lagging degree (Lag) varies irregularly with no pattern.

If a stream processing application is to achieve correct results, it must use time windows based on event time instead of processing time.

Time Windows #

The time window mechanism is the process of dividing stream data along the timeline. Common types of time windows include fixed windows, sliding windows, and session windows. Kafka Streams supports all three types of time windows. In the following examples, I will provide detailed explanations on how to use the Kafka Streams API to implement time window functionality.

Running the WordCount Example #

Alright, I have explained the basic concepts of Kafka Streams and its DSL. Now, I will provide a Hello World example in the field of big data processing: the WordCount program.

The first program to be implemented in almost every big data processing framework is usually word counting. Let’s see how the Kafka Streams DSL implements WordCount. I will first provide the complete code, and then I will explain the meaning of the key parts of the code and how to run it.

package kafkalearn.demo.wordcount;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public final class WordCountDemo {
    public static void main(final String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> source = builder.stream("wordcount-input-topic");

        final KTable<String, Long> counts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .groupBy((key, value) -> value)
            .count();

        counts.toStream().to("wordcount-output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }

        System.exit(0);
    }
}

At the beginning of the program, I create an instance of a Properties object and assign values to the key parameters of the Kafka Streams program, such as the application id, bootstrap servers, and default key and value serializers and deserializers. The application id is a unique identifier for the Kafka Streams application and must be specified explicitly. The default key and value serializers and deserializers are used to serialize and deserialize messages’ keys and values, respectively.

Next, I create an instance of StreamsBuilder and use it to create a KStream. This KStream reads messages from a Kafka topic named “wordcount-input-topic”. The messages in this topic are composed of a group of words separated by spaces, such as “zhangsan lisi wangwu”.

Since we want to count the words, we need to extract the words from the messages. Based on the previous concepts I explained, you should be able to guess that a KTable is a suitable storage structure. Therefore, the next step is to convert the KStream into a KTable.

First, we split the words by using the flatMapValues method. The lambda expression in the code implements the logic of extracting the words from the messages. Since the String.split() method returns multiple words, we use flatMapValues instead of mapValues. The reason is that the former can “flatten” multiple elements into a group of words, while the latter would result in multiple groups of words.

Once we have done all this, the program calls the groupBy method to group the words. Since we are counting, the same words must be grouped together. Then, we call the count method to calculate the count for each word and save it in a KTable object named “counts”.

Finally, we write the counting results back to Kafka. Since a KTable is a table and contains static data, we first convert it into a KStream, and then call the to method to write it to a topic named “wordcount-output-topic”. At this point, the key of the events in counts is the word, and the value is the count. Therefore, when calling the to method, we also specify the serializers for the key and value, which are the string serializer and the long serializer, respectively.

With this, we have completed the logic of the Kafka Streams stream processing. The next step is to create an instance of KafkaStreams and start it. Usually, this part of the code is similar, as it involves calling the start method to start the entire stream processing application and configuring a JVM shutdown hook to gracefully close the stream processing application.

Overall, the way Kafka Streams DSL implements WordCount is quite simple. With just a few operation operators, it easily achieves distributed real-time word counting. In fact, most modern real-time stream processing frameworks tend to adopt this design approach, which simplifies the development process for users by providing rich and convenient out-of-the-box operation operators, allowing users to quickly build real-time computing applications as if playing with building blocks.

After launching this Java program, you need to create the corresponding input and output topics, and continuously write word lines in the format mentioned earlier to the input topic. Then, you need to run the following command to check if the output topic correctly counts the number of words you entered:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic wordcount-output-topic \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Developing API #

After introducing specific examples, let’s take a look at the powerful API provided by Kafka Streams. We can focus on two aspects: common operation operators and the time window API.

Common Operation Operators #

The richness and usability of operation operators are important criteria for measuring the popularity of a stream processing framework. Kafka Streams DSL provides many out-of-the-box operation operators, which can be broadly divided into two categories: stateless operators and stateful operators. Let me introduce a few frequently used operators to you separately.

Among the stateless operators, the filter operator is very commonly used. It performs the filtering logic. Taking WordCount as an example, let’s say we only want to count the number of words that start with the letter “s”. After performing flatMapValues, we can add a line of code as follows:

.filter(((key, value) -> value.startsWith("s")))

Another commonly used stateless operator is the map family. Streams DSL provides many variants, such as map, mapValues, flatMap, and flatMapValues. We have already witnessed the power of flatMapValues, and the functionality of the other three is similar. The variants with “Values” only perform conversion on the message value without touching the message key, while the variants without “Values” can modify the message key.

For example, suppose the current message has no key, and the value is the word itself. Now suppose we want to change the message to a key-value pair: the key is the lowercase word, and the value is the length of the word. We can call the map method as follows:

KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

Finally, let me introduce a group of stateless operators used for debugging: print and peek. Streams DSL allows you to use these two methods to view the events in your message stream. The difference between the two is that print is a terminal operation. Once you call the print method, you cannot call any other methods afterwards. On the other hand, peek allows you to continue processing the message stream while viewing it. Here are two code snippets to illustrate the difference:

stream.print(Printed.toFile("streams.out").withLabel("debug"));
stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value)).map(...);

Common stateful operation operators mainly involve aggregation operations such as counting, summing, averaging, and finding the maximum and minimum values. Currently, Streams DSL only provides the count method for counting, and other aggregation operations require you to implement them using the API.

Suppose we have a message stream, where each event is an individual integer, and we want to sum the even numbers. Here’s how you can implement it in Streams DSL:

final KTable<Integer, Integer> sumOfEvenNumbers = input
         .filter((k, v) -> v % 2 == 0)
         .selectKey((k, v) -> 1)
         .groupByKey()
         .reduce((v1, v2) -> v1 + v2);

Let me briefly explain the selectKey call. Since we want to sum all the even numbers in the events, we need to adjust the keys of these messages to the same value. In this case, I use selectKey to specify a dummy key value, which is the numerical value 1 in the above code. It doesn’t have any meaning, it’s just to assign this key value to all messages. The core code is in the reduce call, which is the key logic for performing the summation.

Time Window Example #

As mentioned earlier, Streams DSL supports 3 types of time windows. The first two types of windows are implemented through the TimeWindows.of method, and the session window is implemented through SessionWindows.with.

Suppose in the previous WordCount example, we want to count the words every minute. To do this, you need to add the following line of code before calling count:

.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))

At the same time, you also need to modify the type of “counts”. It is no longer a KTable but becomes a KTable, because the time window has been introduced, and the event key must carry the information of the time window. Apart from these two changes, the code for WordCount does not need to be modified.

As you can see, Streams DSL does a good job of encapsulating the API. Typically, you only need to add or delete a few lines of code to modify the processing logic.

Summary #

Alright, let’s summarize. Today I shared with you the background and concepts of Kafka Streams and DSL. Then, I demonstrated the WordCount word counting program and how to run it using examples. Finally, I provided sample code for common operator and time window operations. These materials should help you with most of the stream processing development. Additionally, I recommend regularly referring to the official documentation to learn more advanced and in-depth techniques, such as the usage of fixed time windows. In many scenarios, we often need to know the value of a key metric for a specific duration in the past. If you want to achieve this requirement, time windows are inevitably involved.

Open Discussion #

The WordCount example given today does not involve calling the time window API. Instead, we are counting the total occurrences of each word. If we want to count the occurrences of words within every 5 minutes, what line of code should we add?

Feel free to share your thoughts and answer. Let’s discuss together. If you find this article helpful, please feel free to share it with your friends.