39 Building an Enterprise Level Real Time Log Processing Platform Based on Kafka From Scratch

39 Building an Enterprise-Level Real-time Log Processing Platform Based on Kafka From Scratch #

Hello, I’m Hu Xi. Today I want to share with you the topic: Building an Enterprise-level Real-time Log Streaming Platform from Scratch with Kafka.

Simply put, we need to implement a combination of some big data components, just like playing with Lego toys, “plugging” them together to “build” a bigger toy.

In any enterprise, servers generate a lot of log data every day. These data are very rich, including our online business data, user behavior data, and backend system data. Real-time analysis of this data can help us quickly discover potential trends and make targeted decisions. Today, we will use Kafka to build such a platform.

Stream Processing Architecture #

If you search online for real-time log stream processing, you should be able to find a lot of tutorials teaching you how to build a real-time stream processing platform for log analysis. Most of these tutorials use technology stacks like Flume+Kafka+Storm, Spark Streaming, or Flink. In particular, the combination of Flume+Kafka+Flink has gradually become the standard for real-time log stream processing. However, to build such a processing platform, you need to use three frameworks, which increases the complexity of the system and also increases the maintenance cost.

Today, I will demonstrate how to use Apache Kafka to implement a real-time log stream processing system. In other words, the technology stack I’m using is the combination of Kafka Connect+Kafka Core+Kafka Streams.

The following diagram shows the workflow of a real-time log stream processing platform based on Kafka.

From the diagram, we can see that the logs are continuously produced by the web servers, and then they are sent to the Kafka Connect component in real-time. After processing by the Kafka Connect component, the logs are ingested into a topic in Kafka and then sent to the Kafka Streams component for real-time analysis. Finally, Kafka Streams sends the analysis results to another topic in Kafka.

I briefly introduced the Kafka Connect and Kafka Streams components in the previous articles. The former can facilitate data interaction between external systems and Kafka, while the latter can process messages in Kafka topics in real-time.

Now, let’s use these two components and combine all the Kafka knowledge we have learned earlier to build a real-time log analysis platform.

Kafka Connect Component #

First, we use the Kafka Connect component to collect data. As mentioned earlier, the Kafka Connect component is responsible for connecting Kafka with external data systems. The component that connects to external data sources is called a connector. Common external data sources include databases, key-value stores, search systems, or file systems, among others.

Today, we will use the File Connector to read the access logs of Nginx in real-time. Suppose the format of the access log is as follows:

10.10.13.41 - - [13/Aug/2019:03:46:54 +0800] "GET /v1/open/product_list?user_key=**&user_phone=**&screen_height=1125&screen_width=2436&from_page=1&user_type=2&os_type=ios HTTP/1.0" 200 1321

In this log, the “os_type” field in the request parameters currently has two values: “ios” and “android”. Our goal is to calculate the real-time number of requests from the iOS and Android platforms for that day.

Starting Kafka Connect #

Currently, Kafka Connect supports both standalone and cluster mode. We will start the Connect component in cluster mode.

First, we need to start the Kafka cluster. Let’s assume that the connection address of the broker is localhost:9092.

Once the Kafka cluster is up and running, we start the Connect component. There is a config/connect-distributed.properties file in the Kafka installation directory that you need to modify as follows:

bootstrap.servers=localhost:9092
rest.host.name=localhost
rest.port=8083

The first line specifies the Kafka cluster to connect to, and the second and third lines specify the hostname and port for the REST service exposed by the Connect component. After saving these changes, we run the following command to start Connect:

cd kafka_2.12-2.3.0
bin/connect-distributed.sh config/connect-distributed.properties

If everything goes well, Connect should be successfully started. Now we can access the Connect REST service at localhost:8083 in a browser, and we should see the following response:

{"version":"2.3.0","commit":"fc1aaa116b661c8a","kafka_cluster_id":"XbADW3mnTUuQZtJKn9P-hA"}

Adding the File Connector #

Upon seeing this JSON response, it indicates that Connect has been successfully started. At this point, we open a terminal and run the following command to see which connectors are currently available:

$ curl http://localhost:8083/connectors
[]

The result shows that we have not created any connectors yet.

Now, let’s create the corresponding File Connector. This connector reads the specified file, creates a message for each line of text, and sends it to a specific Kafka topic. The create command is as follows:

$ curl -H "Content-Type:application/json" -H "Accept:application/json" http://localhost:8083/connectors -X POST --data '{"name":"file-connector","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/var/log/access.log","tasks.max":"1","topic":"access_log"}}'
{"name":"file-connector","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/var/log/access.log","tasks.max":"1","topic":"access_log","name":"file-connector"},"tasks":[],"type":"source"}

Essentially, this command sends a POST request to the Connect REST service to create the corresponding connector. In this example, the connector class we use is the default FileStreamSourceConnector provided by Kafka. The log file we want to read is located in the /var/log directory, and its data will be sent to the Kafka topic named access_log.

Now, if we run curl http://localhost:8083/connectors again, we can verify whether the connector we just created was successful:

$ curl http://localhost:8083/connectors
["file-connector"]

Clearly, a new connector named file-connector has been successfully created. If you have multiple machines in your production environment, the operation is also simple. You just need to create such a connector on each machine, as long as they are all sent to the same Kafka topic for consumption.

Kafka Streams Components #

It is not enough for the data to reach Kafka, we also need to process it in real time. Let me demonstrate how to write a Kafka Streams application to analyze Kafka topic data in real time.

As we know, Kafka Streams is a component provided by Kafka for real-time stream processing.

Unlike other stream processing frameworks, it is just a library, and the applications written using it are just regular Java applications once they are compiled and packaged. You can use any deployment framework to run Kafka Streams applications.

Moreover, you only need to start multiple application instances, and you will automatically get load balancing and fault tolerance. Therefore, compared to frameworks like Spark Streaming or Flink, Kafka Streams naturally has its advantages.

The image below, taken from the Kafka official website, vividly shows multiple Kafka Streams applications combined together to achieve stream processing scenarios. The image clearly shows three instances of Kafka Streams applications. On one hand, they form a group that participates in and executes the stream processing logic calculations together; on the other hand, they are independent entities with no association with each other, relying solely on Kafka Streams to help them discover and collaborate with each other.

I will provide a detailed explanation of the principles of Kafka Streams later in this column. Today, we just need to learn how to use the API it provides to write stream processing applications that help us find the ratio of request counts sent by the iOS and Android platforms mentioned earlier.

Writing a Stream Processing Application #

To use Kafka Streams, you need to explicitly add the kafka-streams dependency to your Java project. I will demonstrate the configuration methods using the latest 2.3 version for both Maven and Gradle.

Maven:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.3.0</version>
</dependency>


Gradle:
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.3.0'

Now, let me first provide the complete code, and then I will explain the meaning of the key parts in the code in detail.

package com.geekbang.kafkalearn;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class OSCheckStreaming {

    public static void main(String[] args) {


        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "os-check-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.getClass().getName());

        final Gson gson = new Gson();
        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("access_log");
        source.mapValues(value -> gson.fromJson(value, LogLine.class)).mapValues(LogLine::getPayload)
                .groupBy((key, value) -> value.contains("ios") ? "ios" : "android")
                .windowedBy(TimeWindows.of(Duration.ofSeconds(2L)))
                .count()
                .toStream()
                .to("os-check", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Exception e) {
    System.exit(1);
}
System.exit(0);
}
}

class LogLine {
    private String payload;
    private Object schema;

    public String getPayload() {
        return payload;
    }
}

This code will read the access_log topic in real time, calculate the total number of requests from iOS and Android every 2 seconds, and write this data to the os-check topic.

First, we construct a Properties object. This object is responsible for initializing the key parameters needed by the Streams application. For example, in the example above, we set the bootstrap.servers parameter, the application.id parameter, and the default serializer and deserializer.

You should already be familiar with the bootstrap.servers parameter, so I won’t go into detail here. The application.id is a very important parameter in the Streams program. You must specify a unique string within the cluster to identify your Kafka Streams program. The serializer and deserializer set the classes used for serialization and deserialization when executing Streams. In this example, we set the type to String, which means that the serializer will convert the String to a byte array, and the deserializer will convert the byte array back to a String.

After building the Properties instance, the next step is to create a StreamsBuilder object. Later, we will use this builder to implement the specific stream processing logic.

In this example, we implement the following stream processing logic: calculate the total number of requests sent from iOS and Android every 2 seconds. Do you remember what our original data looks like? It is a line in the Nginx log, but the Connect component will wrap it in JSON format and send it to Kafka. Therefore, we need the help of Gson to restore the JSON string to a Java object, which is why I created the LogLine class in the code.

The mapValues call in the code converts the received JSON string to a LogLine object. Then we call mapValues again to extract the payload field from the LogLine object, which contains the actual log data. In this way, after two mapValues method calls, we successfully converted the original data into the actual Nginx log line data.

It is worth noting that the code uses the mapValues method provided by Kafka Streams. As the name implies, this method only transforms the value of the message, without modifying the key.

In fact, Kafka Streams also provides the map method, which allows you to modify both the key and the value of the message. Generally, we believe that mapValues is more efficient than map because changing the key may cause downstream processing operators to be re-partitioned, which can reduce performance. If possible, it is best to use the mapValues method.

After obtaining the actual log line data, we use the groupBy method to perform the counting calculation. Since we want to count the number of requests from both iOS and Android, we group by the Key of iOS or Android. In the code above, I only rely on whether the log line contains a specific keyword to determine which platform it belongs to. The proper way is to analyze the Nginx log format and extract the corresponding parameter value, os_type.

After grouping by, we also need to restrict the time window range to be counted, that is, where the total number of requests from both platforms is calculated. In this example, I call the windowedBy method to require Kafka Streams to calculate the total number of requests from both platforms every 2 seconds. After setting the time window, the next step is to call the count method to perform the counting calculation.

After all this is done, we need to call the toStream method to convert the table that was just calculated into an event stream, so that we can observe its content in real time. I will explain the concepts of streams and tables in the stream processing field and their differences in the last few sections of this column. Here, you only need to know that toStream converts a Table into a Stream.

Finally, we call the to method to continuously write the time window statistics data to the Kafka topic named os-check, thereby achieving our requirement of real-time analysis and processing of Nginx logs.

Start the stream processing application #

Since the Kafka Streams application is just a regular Java application, you can compile, package, and deploy it in a way you are familiar with. The OSCheckStreaming.java in this example is an executable Java class, so you can run it directly. If everything goes well, it will continuously write the statistical data to the os-check topic.

View the statistical results #

If we want to view the statistical results, a simple method is to use the kafka-console-consumer script provided by Kafka. The command is as follows:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic os-check --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer --property key.deserializer.default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes\$StringSerde
[android@1565743788000/9223372036854775807] 1522
[ios@1565743788000/9223372036854775807] 478
[ios@1565743790000/9223372036854775807] 1912
[android@1565743790000/9223372036854775807] 5313
[ios@1565743792000/9223372036854775807] 780
[android@1565743792000/9223372036854775807] 1949
[android@1565743794000/9223372036854775807] 37
...

Since the statistical results we are calculating are within a certain time window range, the key of the message that carries this statistical result encapsulates the information of this time window. The specific format is: [ios or android@start time/end time], and the value of the message is a simple number, which represents the total number of requests in this time window.

If we subtract the start time from the adjacent output lines of ios, we will find that they are indeed output every 2 seconds, and each output will calculate the total number of requests from iOS and Android simultaneously. Next, you can subscribe to this Kafka topic and export the results to other data storage systems you expect in real time.

Summary #

In conclusion, the real-time log stream processing platform based on Apache Kafka has been set up. During the setup process, we only used Kafka as the big data framework to install, configure, and develop all components. Compared to a technology stack like Flume+Kafka+Flink, the pure Kafka solution has significant advantages in terms of operation and management costs. If you plan to build a real-time stream processing platform from scratch, you might consider the combination of Kafka Connect, Kafka Core, and Kafka Streams.

In fact, Kafka Streams provides much more functionality than just counting. Today, I only showed you the tip of the iceberg of Kafka Streams. In the upcoming columns, I will focus on introducing you to the usage and management of Kafka Streams components. Stay tuned.

Open Discussion #

Please compare the Flume+Kafka+Flink solution with the pure Kafka solution and consider their respective advantages and disadvantages. In practical scenarios, how should we choose between them?

Feel free to write down your thoughts and answers, and let’s discuss together. If you find it helpful, you can also share the article with your friends.