14 Message Driven How to Integrate Kafka With Kafka Template

14 Message-Driven - How to Integrate Kafka with KafkaTemplate #

Starting today, we will discuss another important topic in Spring Boot, which is message communication.

Message communication is a representative technology system in the middle-tier components of web applications, mainly used to build complex and flexible business processes. In Internet applications, message communication is considered a key technology system for achieving system decoupling and high concurrency. In this lesson, we will introduce the message communication mechanism in the SpringCSS case to achieve asynchronous interaction between multiple services.

Message Communication Mechanism and the SpringCSS Case #

Before introducing the message communication mechanism and the message middleware, let’s first review the application scenarios in the SpringCSS case.

Message communication scenarios in the SpringCSS case #

In the SpringCSS case, the account information of a user does not change frequently. Because the account-service and customer-service are located in two different services, to reduce the cost of remote interaction, we often think of storing a copy of the user account information locally in the customer-service, and directly retrieving the user account from the local database during the customer work order generation process.

Under this design and implementation approach, how should we respond correctly and efficiently when the account information of a user changes? At this time, the message-driven mechanism provides us with a good implementation solution from the perspective of system scalability.

When the user account information changes, the account-service will first send a message informing that the account information of a certain user has changed, and then notify all the services interested in the message. In the SpringCSS case, this service is the customer-service, which acts as the subscriber and consumer of this message.

With this approach, the customer-service can quickly obtain the user account change message, thereby correctly and efficiently handling the local user account data.

The diagram below shows the overall scenario:

Drawing 0.png

Message communication mechanism in the user account update scenario

From the diagram above, we can see that the message communication mechanism allows us to achieve the entire interaction process with minimal cost, simple and convenient.

Introduction to the message communication mechanism #

The overall workflow of the message communication mechanism is shown in the diagram below:

Drawing 1.png

Diagram of the message communication mechanism

In the diagram above, the various message middlewares in the middle of the workflow are generally provided with message sending and receiving client components. These client components are embedded in the business services.

The message producer is responsible for generating messages, which are generally performed by the business system in actual business scenarios; while the message consumer is responsible for consuming messages, which are generally handled asynchronously by the background system.

There are two basic models for message communication: the publish-subscribe (Pub-Sub) model and the point-to-point model. The publish-subscribe model supports a one-to-many relationship between producers and consumers, while the point-to-point model has only one consumer.

These concepts constitute the most basic models of message communication systems. Based on this model, the industry has established some implementation specifications and tools. Representative specifications include JMS and AMQP, as well as their implementation frameworks such as ActiveMQ and RabbitMQ. Tools like Kafka do not adhere to specific specifications, but they also provide design and implementation solutions for message communication.

In this lesson, we will focus on Kafka, and we will introduce ActiveMQ and RabbitMQ separately in the next two lessons.

Similar to JdbcTemplate and RestTemplate introduced earlier, Spring Boot, as an integrated framework that supports rapid development, also provides a batch of template utility classes named with -Template for message communication. For Kafka, this utility class is KafkaTemplate.

Integrating Kafka with KafkaTemplate #

Before discussing how to integrate Kafka with KafkaTemplate, let’s first have a brief understanding of the basic architecture of Kafka and introduce several core concepts in Kafka.

Basic Architecture of Kafka #

The basic architecture of Kafka is shown in the diagram below. From the diagram, we can see that the common concepts in message communication systems such as Broker, Producer, Consumer, Push, and Pull are all reflected in Kafka. Producers use the Push mode to publish messages to Brokers, while consumers use the Pull mode to subscribe to messages from Brokers.

Drawing 2.png

Kafka basic architecture diagram

In the diagram above, we notice that the Kafka architecture also uses ZooKeeper.

ZooKeeper stores the metadata and consumer offsets in Kafka. Its role is to achieve load balancing between Brokers and consumers. Therefore, in order to run Kafka, ZooKeeper needs to be started first, and then Kafka servers need to be started.

In Kafka, there is another core concept called Topic, which is the basic unit for writing data in Kafka. Each Topic can have multiple replicas to ensure its availability. Each message belongs to one and only one Topic, so when developers use Kafka to send messages, they must specify which Topic the message is published to. Similarly, when consumers subscribe to messages, they must specify which Topic the messages are from.

On the other hand, from the composition structure, a Topic can contain one or more partitions. Therefore, when creating a Topic, we can specify the number of partitions.

KafkaTemplate is a template utility class provided by Spring for message communication based on Kafka. To use this template class, we need to add the following Maven dependency to both the message sender and consumer applications:

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

Sending Messages with KafkaTemplate #

KafkaTemplate provides a series of send methods to send messages. A typical send method is defined as follows:

@Override

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {

}

In the above method, two parameters are actually passed in: the Topic corresponding to the message and the content of the message. With this method, we can complete the basic process of sending messages.

Please note that when using Kafka, we recommend creating Topics in advance for use by message producers and consumers. The command line method to create Topics is shown in the following code:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic springcss.account.topic

Here, a topic named “springcss.account.topic” is created with a replication factor and partition count of 3.

In fact, when calling the send method of KafkaTemplate, if the specified topic does not exist in Kafka, it will automatically create a new topic.

On the other hand, KafkaTemplate also provides a set of sendDefault methods, which send messages using the default topic, as shown in the code below:

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
    return send(this.defaultTopic, data);
}

From the code, we can see that the sendDefault method internally uses the send method to send messages.

So how do we specify the defaultTopic here? In Spring Boot, we can use the following configuration to accomplish this:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: demo.topic

Now that we understand the implementation of sending messages using KafkaTemplate, KafkaTemplate abstracts the message sending process and makes it very simple.

Next, let’s switch perspectives and see how to consume the sent messages.

Consuming Messages using @KafkaListener Annotation #

First, it is important to emphasize that by browsing the class definition provided by KafkaTemplate, we have not found any methods related to receiving messages. This is actually closely related to Kafka’s design philosophy.

This is also different from JmsTemplate and RabbitTemplate, which will be introduced later in the course, as they both provide explicit receive methods for receiving messages.

From the Kafka architecture diagram provided earlier, we can see that in Kafka, messages are pushed to individual consumers by the server, and Kafka consumers need to provide a listener to implement the listening for a specific topic in order to receive messages. This is the only way to consume messages in Kafka.

In Spring, there is an @KafkaListener annotation for implementing the listener. The definition of this annotation is as follows:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id() default "";
    String containerFactory() default "";
    // Message topic
    String[] topics() default {};
    // Topic pattern expression
    String topicPattern() default "";
    // Topic partition
    TopicPartition[] topicPartitions() default {};
    String containerGroup() default "";
    String errorHandler() default "";
    // Message group id
    String groupId() default "";
    boolean idIsGroup() default true;
    String clientIdPrefix() default "";
    String beanRef() default "__listener";
}

}

In the above code, we can see that the definition of @KafkaListener is quite complex. I have added comments for several common configuration options used in daily development.

When using @KafkaListener, the most important operation is setting the Topic. Kafka also provides a pattern matching expression to flexibly set the target Topic.

Here, it is necessary to emphasize the groupId property, which involves another core concept in Kafka: consumer group.

The purpose of designing consumer groups is to address the problem of multiple service instances in a clustered environment. Obviously, if we adopt the publish-subscribe mode, different instances of a service may consume the same message.

To solve this problem, Kafka provides the concept of consumer groups. Once we use consumer groups, a message can only be consumed by one service instance in the same group.

The basic structure of a consumer group is shown in the diagram below:

Drawing 3.png

Diagram of a Kafka consumer group

When using the @KafkaListener annotation, we can directly add it to the method that handles the message, as shown in the following code:

    @KafkaListener(topics = "demo.topic")
    public void handlerEvent(DemoEvent event) {
        //TODO: Add message handling logic
    }

Of course, we also need to specify the configuration options for message consumption in the consumer’s configuration file, as shown in the following code:

spring:      
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: demo.topic
    consumer:
      group-id: demo.group

Here, in addition to specifying the template.default-topic configuration option, we also specify the consumer.group-id configuration option to specify the consumer group information.

Integrating Kafka in the SpringCSS Case #

After introducing the basic principles of KafkaTemplate, we will introduce Kafka into the SpringCSS case to achieve message communication between the account-service and customer-service.

Implementing the Account-service Message Producer #

First, we create a Spring Boot project to store the message objects used for interaction between multiple services, for use by various services.

We name this code project “message” and add an event AccountChangedEvent representing the message body, as shown in the following code:

package com.springcss.message;

public class AccountChangedEvent implements Serializable {
    // Event type
    private String type;
    // Operation corresponding to the event (create, update, and delete)
    private String operation;
    // Domain model corresponding to the event
    private AccountMessage accountMessage;
    // Omitted getter/setter
}

The AccountChangedEvent class mentioned above contains the AccountMessage object itself and its operation type. The AccountMessage has the same definition as the Account object, with the additional implementation of the Serializable interface for serialization purposes, as shown in the following code:

public class AccountMessage implements Serializable {
    private Long id;
    private String accountCode;    
    private String accountName;
}

After defining the message entity, we reference a “message” project in the account-service and add a KafkaAccountChangedPublisher class to implement message publication, as shown in the following code:

@Component("kafkaAccountChangedPublisher")
public class KafkaAccountChangedPublisher {
    @Autowired
    private KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;
      
    @Override
    protected void publishEvent(AccountChangedEvent event) {
        kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);
    }
}

Here, we inject a KafkaTemplate object and use its send method to send a message to the target topic.

The AccountChannels.SPRINGCSS_ACCOUNT_TOPIC here is “springcss.account.topic”. We need to specify the same topic in the configuration file of the account-service, as shown in the following code:

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: springcss.account.topic
    producer:
      keySerializer: org.springframework.kafka.support.serializer.JsonSerializer
      valueSerializer: org.springframework.kafka.support.serializer.JsonSerializer

Note that here we use a JsonSerializer to serialize the messages being sent.

Implementing the Customer-service Message Consumer #

For the customer-service consumer, let’s first look at its configuration information, as shown in the code below:

spring:      
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: springcss.account.topic
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: springcss_customer
      properties:
        spring.json.trusted.packages: com.springcss.message

Compared to the configuration information of the message producer, the configuration information of the message consumer has two additional options. One is the group-id, which we already know about from the previous content. This is a consumer-specific option used to specify the consumer group.

The other option is spring.json.trusted.packages, which is used to set the acceptable package names for JSON serialization. This name needs to be consistent with the package structure of the AccountChangedEvent class, which is specified as com.springcss.message here.

Summary and Preview #

Message communication is a commonly used technology system in the application development process. In today’s lesson, we first clarified the application scenarios of message communication based on the SpringCSS case and provided some basic concepts of this mechanism. Then, based on Kafka, a detailed middleware in mainstream systems, we used the KafkaTemplate provided by Spring Boot to complete message sending and consumption, and integrated it into the SpringCSS case.