13 Adopting Message Driven Environments for Processing Billing Notifications and Integration of Messaging Middleware

13 Adopting Message-Driven Environments for Processing Billing Notifications and Integration of Messaging Middleware #

Cache and queue are common strategies for dealing with high-concurrency and high-load environments on the Internet. Cache greatly improves data read and write operations, while queue effectively smooths out pressure peaks and reduces system load. One of the best solutions for implementing queues is to use message middleware. However, message middleware is not limited to queues alone. It can also be used for asynchronous decoupling, message-driven development, and other functionalities. In this chapter, we will explore message-driven development in a microservices environment.

Message Middleware Products #

There are many message middleware products available, including Apache ActiveMQ, RabbitMQ, ZeroMQ, Kafka, Apache RocketMQ, and many more. There are numerous articles on the Internet that discuss how to choose the right one. (Here is an official document comparing ActiveMQ and Kafka: http://rocketmq.apache.org/docs/motivation/). We won’t go into detail about this topic here.

Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.

The above is the definition of message middleware from Wikipedia. The scenario is quite clear - distributed systems, which can be software or hardware, use message sending and receiving to achieve asynchronous decoupling. In most cases, it consists of three components: message producers, middleware services, and message consumers.

This case study is mainly based on the Spring Cloud Alibaba project. RocketMQ, as part of the project stack, has excellent performance in Alibaba’s product line, making it the preferred choice for more and more projects. In this case, RocketMQ will also be used as the message middleware. Let’s start by understanding the basic principles of RocketMQ.

What is RocketMQ #

RocketMQ is an open-source distributed messaging middleware developed by Alibaba, implemented in pure Java. It has relatively simple implementation of clustering and high availability. It has lower message loss rate when encountering crashes and other failures. It is widely used in many Alibaba product lines and has proven its stability under various high-pressure scenarios. It is currently being managed by the Apache open-source community, which has a higher level of activity. Official website: http://rocketmq.apache.org/.

There are several core modules in RocketMQ:

  • Broker: The core module of RocketMQ, responsible for receiving and storing messages.
  • NameServer: It can be considered as the registry of RocketMQ. It manages two types of data: the routing configuration of topic-queue in the cluster and the real-time configuration information of brokers. Therefore, it is necessary to ensure the availability of the broker/NameServer in order to produce, consume, and transmit messages.
  • Producer and Product Group: The producer is the part that generates messages.
  • Consumer and Consumer Group: The consumer is the part that consumes messages.
  • Topic/Message/Queue: Mainly used to store message content.

img

(RocketMQ architecture diagram, source: official website, all components are shown in Cluster form)

RocketMQ Configuration and Installation #

Prepare the compiled binary installation package, commonly known as the “green version”.

appledeMacBook-Air:bin apple$ wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip

appledeMacBook-Air:software apple$ unzip rocketmq-all-4.6.0-bin-release.zip

appledeMacBook-Air:software apple$ cd rocketmq-all-4.6.0-bin-release/bin

appledeMacBook-Air:bin apple$ nohup ./mqnamesrv &

appledeMacBook-Air:bin apple$ nohup ./mqbroker -n localhost:9876 &

In addition, you must configure the NAMESRV_ADDR address, otherwise it cannot be used normally. You can also write it to the profile file or use it directly from the command line:

export NAMESRV_ADDR=localhost:9876

To shut down, first shut down the broker server, then shut down the namesrv.

sh bin/mqshutdown broker
The mqbroker(12465) is running...
Send shutdown request to mqbroker(12465) OK
sh bin/mqshutdown namesrv
The mqnamesrv(12456) is running...
Send shutdown request to mqnamesrv(12456) OK

Testing the Installation #

Start two terminals. In the message producer terminal, enter the following command:

#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
#The loop writes messages waiting to be consumed by consumers

In another terminal, enter the consumer command:

#sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
#The consumed messages are printed directly below

img

Integrating RocketMQ into Services #

When integrating RocketMQ into a Spring Cloud project, the Spring Cloud Stream sub-project is required. When using it, attention must be paid to the compatibility of versions between the sub-project and the main project. There are three key concepts in the project:

  • Destination Binders: Components that integrate with external components, such as Kafka or RabbitMQ.
  • Destination Bindings: The bridge between external message delivery systems and applications, represented by the gray columns in the diagram below.
  • Message: The data entity used by producers or consumers to interact with the message middleware.

img

(Source: Official documentation spring-cloud-stream-overview-introducing)

The following example will deepen the understanding of the above diagram through practice.

Integrating the Consumer Side #

The parking-message module serves as the message consumer. In the pom.xml file, include the dependency jar (the version is not configured here, I believe you know why):

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

Add the following configuration in the application.properties file:

#rocketmq config
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
#The name "input" in the configuration below needs to be consistent with the name in the Sink class in the code
spring.cloud.stream.bindings.input.destination=park-pay-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=park-pay-group
#Whether to consume messages in synchronous mode, default is false
spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

Here, the default message consumption channel input is used. Add the @EnableBinding({Sink.class}) annotation in the startup class to connect to the message broker component during startup. What is Sink? It is a simple message channel definition built into the project, where Sink represents the destination of the message. The producer side will use Source, which represents the source of the message.

Write the consumer class and add the @StreamListener annotation to make it receive stream processing events and handle the received messages continuously:

@Service
@Slf4j
public class ConsumerReceive {

    @StreamListener(Sink.INPUT)
    public void receiveInput(String json) throws BusinessException {
        // Only used for testing, in production applications, it can be integrated with the corresponding message push interface, such as JPush, WeChat, SMS, etc.
        log.info("Receive input msg = " + json + " by RocketMQ...");

    }
}

Integration on the Producer Side #

In the parking-charging module, when a customer’s vehicle leaves, whether it is a monthly card user or a non-monthly card user, a message needs to be sent to the customer to prompt the deduction information after payment. In the module’s pom.xml file, use the starter to import the JAR file:

<!-- rocketmq -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

application.properties:

# rocketmq config
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# The name `output` in the following configuration should be consistent with the name used in the `Source` in the code
spring.cloud.stream.rocketmq.bindings.output.producer.group=park-pay-group-user-ouput
spring.cloud.stream.rocketmq.bindings.output.producer.sync=true

spring.cloud.stream.bindings.output.destination=park-pay-topic
spring.cloud.stream.bindings.output.content-type=application/json

Add the @EnableBinding({Source.class}) annotation in the startup class. Note that the key binding here is Source, which corresponds to Sink on the consumer side.

Write the message sending method:

@Autowired
Source source;

@PostMapping("/sendTestMsg")
public void sendTestMsg() {
    Message message = new Message();
    message.setMcontent("This is the first message test.");
    message.setMtype("Payment message");
    source.output().send(MessageBuilder.withPayload(JSONObject.toJSONString(message)).build());
}

Start both the parking-charging and parking-message modules, and call the send message test method. In normal cases, the following log will be output:

2020-01-07 20:37:42.311  INFO 93602 --- [MessageThread_1] c.m.parking.message.mq.ConsumerReceive   : Receive input msg = {"mcontent":"This is the first message test.","mtype":"Payment message"} by RocketMQ...
2020-01-07 20:37:42.315  INFO 93602 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A800696DA018B4AAC223534ED40000 cost: 35 ms

Here, we only used the default Sink and Source interfaces. When there are more channels used in the project, you can customize your own Sink and Source interfaces as long as you follow the writing rules of Sink and Source. Replace the default loading classes in the project to use them normally.

// Customized Sink channel
public interface MsgSink {

    /**
     * Input channel name.
     */
    String INPUT1 = "input1";

    /**
     * @return the input channel that subscribes to a message
     */
    @Input(MsgSink.INPUT1)
    SubscribableChannel myInput();
}

// Customized Source channel
public interface MsgSource {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output1";

    /**
     * @return the output channel
     */
    @Output(MsgSource.OUTPUT)
    MessageChannel output1();
}

Spring Cloud Stream integrates many message system components. Interested users can try other message systems to see how they differ from RocketMQ. In the above example, we have completed an example of message-driven development through the use of an intermediate layer, which decouples the system asynchronously and allows the system to focus more on its own business logic, such as the parking-message project focusing on handling external message pushes, such as sending WeChat messages, SMS, emails, app push notifications, etc.

A thought-provoking question:

Which one, service invocation between microservices or message-driven development mentioned in this chapter, reduces the coupling between systems more? Which one is more convenient to implement?