16 Message Driven How to Integrate Rabbit Mq With Rabbit Template

16 Message-Driven - How to Integrate RabbitMQ with RabbitTemplate #

In Lesson 15, we introduced sending and consuming messages using ActiveMQ and JmsTemplate, and refactored the account-service and customer-service services in the SpringCSS example system.

Today, we will introduce another mainstream message middleware: RabbitMQ, and add corresponding message communication mechanisms to the SpringCSS example using the RabbitTemplate template tool class.

AMQP Specification and RabbitMQ #

AMQP (Advanced Message Queuing Protocol) is an application layer standard advanced message queue specification that provides a unified messaging service. Similar to the JMS specification, AMQP describes a set of modular components and standard rules for connecting components to clarify the semantics of the interaction between clients and servers. There are also a number of frameworks in the industry that implement the AMQP specification, among which RabbitMQ is highly representative.

AMQP Specification #

In the AMQP specification, there are three core components: Exchange, Queue, and Binding. The Exchange is used to receive messages sent by applications and route these messages to message queues based on certain rules. The message queue is used to store messages until they are safely handled by consumers. The binding defines the association between exchanges and message queues and provides routing rules for them.

The AMQP specification does not explicitly specify the point-to-point model and the publish-subscribe model, like the one-to-one and one-to-many models in JMS. However, by controlling the routing rules between the Exchange and Queue, we can easily simulate the concept of topics in typical message middleware.

How does the Exchange know which Queue to send messages to if there are multiple Queues?

This can be achieved by setting routing information through binding rules. After being associated with multiple Queues, Exchange will have a routing table that maintains the storage conditions for each Queue.

The messages include a routing key, which is generated by the message sender and provided to the Exchange as a standard for routing the message. The Exchange checks the routing key and determines which Queue to route the message to based on the routing algorithm.

The following diagram illustrates the routing relationship between the Exchange and Queue, which shows that a message from a producer can be sent to one or more Queues through the routing algorithm in the Exchange, thus achieving point-to-point and publish-subscribe functions.

Image 7

AMQP routing diagram

In the diagram, different routing algorithms correspond to different types of Exchanges. The AMQP specification specifies several Exchange types, such as Direct Exchange, Fanout Exchange, Topic Exchange, and Header Exchange. However, in this lesson, we will focus on Direct Exchange.

A Direct Exchange can route a message to zero or more queues based on the precise matching of the message’s routing key, as shown in the following diagram:

Image 8

Diagram of Direct Exchange

RabbitMQ Architecture #

RabbitMQ is an AMQP specification implementation framework developed using the Erlang language. The ConnectionFactory, Connection, and Channel are the most basic objects provided by RabbitMQ’s external API and need to comply with the recommendations of the AMQP specification. Among them, the Channel is the most important interface in the interaction between the application and RabbitMQ because most of our business operations need to be completed through the Channel interface, such as defining Queues, defining Exchanges, binding Queues and Exchanges, and publishing messages.

To start RabbitMQ, we only need to run the rabbitmq-server.sh file. However, because RabbitMQ depends on Erlang, we need to ensure that the Erlang environment is installed first.

Next, let’s see how to integrate RabbitMQ using the RabbitTemplate provided by the Spring framework.

Integrating RabbitMQ Using RabbitTemplate #

To integrate RabbitMQ using RabbitTemplate, we first need to add a dependency on spring-boot-starter-amqp to the Spring Boot application, as shown in the following code:

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

Sending Messages Using RabbitTemplate #

Like other template classes, RabbitTemplate provides a set of send methods for sending messages, as shown in the following code:

@Override

public void send(Message message) throws AmqpException {

    send(this.exchange, this.routingKey, message);

}
 


@Override

public void send(String routingKey, Message message) throws AmqpException {

    send(this.exchange, routingKey, message);

}
 


@Override

public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {

    send(exchange, routingKey, message, null);

}

Here, we specify the Exchange for message sending and the routing key used for message routing. Because these send methods send raw message objects, when integrating with business code, we need to convert business objects into Message objects. The example code is as follows:

public void sendDemoObject(DemoObject demoObject) { 

    MessageConverter converter = rabbitTemplate.getMessageConverter(); 

    MessageProperties props = new MessageProperties(); 

    Message message = converter.toMessage(demoObject, props); 

    rabbitTemplate.send("demo.queue", message); 

}

If we do not want to embed raw message objects such as Message in business code, we can also use the convertAndSend method group of RabbitTemplate to achieve this, as shown in the following code:

@Override

public void convertAndSend(Object object) throws AmqpException {

        convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null);

}
 


@Override

public void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {

        convertAndSend(this.exchange, this.routingKey, object, correlationData);

}
 


@Override

public void convertAndSend(String routingKey, final Object object) throws AmqpException {

        convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);

}
 


@Override

public void convertAndSend(String routingKey, final Object object, CorrelationData correlationData)

            throws AmqpException {

        convertAndSend(this.exchange, routingKey, object, correlationData);

}
 


@Override

public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {

        convertAndSend(exchange, routingKey, object, (CorrelationData) null);

}

The convertAndSend method group mentioned above internally completes the automatic conversion process from business objects to raw message objects. Therefore, we can simplify the message sending process using the following code.

public void sendDemoObject(DemoObject demoObject) {

    rabbitTemplate.convertAndSend("demo.queue", demoObject);

}

Of course, sometimes we need to add some properties to the message during the sending process, and it is unavoidable to operate on the native Message object. RabbitTemplate also provides a set of overloaded methods convertAndSend to handle this scenario, as shown in the following code:

@Override
public void convertAndSend(String exchange, String routingKey, final Object message,
    final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {

    Message messageToSend = convertMessageIfNecessary(message);

    messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData);

    send(exchange, routingKey, messageToSend, correlationData);
}

Note that here we use a MessagePostProcessor class to post-process the generated message. The usage of MessagePostProcessor is shown in the following code:

rabbitTemplate.convertAndSend("demo.queue", event, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        // Processing for the Message goes here
        return message;
    }
});

The last step in using RabbitTemplate is to add configuration properties in the configuration file. When configuring, we need to specify the address, port, username, password, and other information of the RabbitMQ server, as shown in the following code:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: DemoHost

Note, for the consideration of multi-tenancy and security, AMQP also introduces the concept of Virtual Host. Hence, there is a virtual-host configuration option here.

Virtual Host is similar to an access control group and can contain multiple Exchanges and Queues. When multiple different users use the services provided by the same RabbitMQ server, we can divide it into multiple Virtual Hosts and create the corresponding components in our own Virtual Host. The diagram below shows this:

图片9.png

AMQP model with Virtual Host

Consuming Messages with RabbitTemplate #

Similar to JmsTemplate, when consuming messages with RabbitTemplate, we can use either push mode or pull mode.

In pull mode, a typical example of consuming messages with RabbitTemplate is shown in the following code:

public DemoEvent receiveEvent() {
    return (DemoEvent) rabbitTemplate.receiveAndConvert("demo.queue");
}

Here, we use the receiveAndConvert method in RabbitTemplate, which can pull a message from a specified queue. The code inside receiveAndConvert method is as follows:

@Override
public Object receiveAndConvert(String queueName) throws AmqpException {
    return receiveAndConvert(queueName, this.receiveTimeout);
}

Please note that the inner receiveAndConvert method has a second parameter receiveTimeout, which has a default value of 0, meaning that even if the queue is empty when receiveAndConvert is called, the method will immediately return an empty object instead of waiting for the next message to arrive. This is fundamentally different from JmsTemplate introduced in Lesson 15.

If we want to achieve blocking wait just like JmsTemplate, we can set the receiveTimeout parameter accordingly, as shown in the following code:

public DemoEvent receiveEvent() {
    return (DemoEvent) rabbitTemplate.receiveAndConvert("demo.queue", 2000ms);
}

If we don’t want to specify receiveTimeout every time when calling the method, we can set the receiveTimeout at the RabbitTemplate level by adding a configuration item in the configuration file, as shown in the following code:

spring:
  rabbitmq:
    template:
      receive-timeout: 2000

Of course, RabbitTemplate also provides a set of receive methods that support receiving native messages. However, we still recommend using the receiveAndConvert method to consume messages in pull mode.

After introducing the pull mode, let’s move on to the push mode. Implementing the push mode is also simple, as shown in the following code:

@RabbitListener(queues = "demo.queue")
public void handlerEvent(DemoEvent event) {
    //TODO: Add message handling logic
}

Developers can specify the target queue in @RabbitListener to automatically receive messages from that queue. This implementation method is exactly the same as @JmsListener introduced in Lesson 15.

Integrating RabbitMQ into the SpringCSS Example #

Because the usage of these three template utility classes is very similar, and they can be used to extract common code to form a unified interface and abstract class, as the final lesson introducing message middleware, we would like to abstract the integration method of the three template utility classes in the SpringCSS example.

Implementing the Account-Service Message Producer #

In the account-service, which is the message producer, we extract the AccountChangedPublisher shown below as the unified interface for message publishing.

public interface AccountChangedPublisher {

    void publishAccountChangedEvent(Account account, String operation);

}

Please note that this is a business-oriented interface and does not use the AccountChangedEvent object used for message communication.

In the implementation class AbstractAccountChangedPublisher, which is the implementation of the AccountChangedPublisher interface, we construct the AccountChangedEvent object, as shown in the following code:

public abstract class AbstractAccountChangedPublisher implements AccountChangedPublisher {



    @Override
    //...
}
public void publishAccountChangedEvent(Account account, String operation) {

    AccountMessage accountMessage = new AccountMessage(account.getId(), account.getAccountCode(), account.getAccountName());

    AccountChangedEvent event = new AccountChangedEvent(AccountChangedEvent.class.getTypeName(),

            operation.toString(), accountMessage);

    publishEvent(event);

}

protected abstract void publishEvent(AccountChangedEvent event);

}

AbstractAccountChangedPublisher is an abstract class. We build a message object AccountChangedEvent based on the incoming business object and send the message through the abstract method publishEvent.

For different message brokers, we need to implement the corresponding publishEvent method. Taking Kafka as an example, we refactored the original code and provided the following implementation class KafkaAccountChangedPublisher.

@Component("kafkaAccountChangedPublisher")

public class KafkaAccountChangedPublisher extends AbstractAccountChangedPublisher {

@Autowired

private KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;

@Override

protected void publishEvent(AccountChangedEvent event) {

kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);

}

}

For RabbitMQ, the implementation of RabbitMQAccountChangedPublisher is similar, as shown in the following code:

@Component("rabbitMQAccountChangedPublisher")

public class RabbitMQAccountChangedPublisher extends AbstractAccountChangedPublisher {

@Autowired

private RabbitTemplate rabbitTemplate;

@Override

protected void publishEvent(AccountChangedEvent event) {

rabbitTemplate.convertAndSend(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, event, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

MessageProperties props = message.getMessageProperties();

props.setHeader("EVENT_SYSTEM", "SpringCSS");

return message;

}

});

}

}

For RabbitMQ, before using RabbitMQAccountChangedPublisher to send messages, we need to initialize the Exchange, Queue, and the Binding relationship between them. Therefore, we implement the RabbitMQMessagingConfig configuration class as shown below:

@Configuration

public class RabbitMQMessagingConfig {

public static final String SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE = "springcss.account.exchange";

public static final String SPRINGCSS_ACCOUNT_ROUTING = "springcss.account.routing";

@Bean

public Queue SpringCssDirectQueue() {

return new Queue(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, true);

}

@Bean

public DirectExchange SpringCssDirectExchange() {

return new DirectExchange(SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE, true, false);

}

@Bean

public Binding bindingDirect() {

return BindingBuilder.bind(SpringCssDirectQueue()).to(SpringCssDirectExchange())

.with(SPRINGCSS_ACCOUNT_ROUTING);

}

@Bean

public Jackson2JsonMessageConverter rabbitMQMessageConverter() {

return new Jackson2JsonMessageConverter();

}

}

The above code initializes a DirectExchange, a Queue, and sets the binding relationship between them. At the same time, we also initialize a Jackson2JsonMessageConverter to convert the message into a serialized object for transmission over the network.

#### Implementing the message consumer in the customer-service

Now, let's go back to the customer-service. First, let's take a look at the unified interface AccountChangedReceiver used to receive messages, as shown in the following code:

public interface AccountChangedReceiver {

// Method for receiving messages in pull mode

void receiveAccountChangedEvent();

// Method for receiving messages in push mode

void handlerAccountChangedEvent(AccountChangedEvent event);

}

AccountChangedReceiver defines the method for receiving messages in both pull mode and push mode. We also extracted an abstract implementation class AbstractAccountChangedReceiver, as shown in the following code:

public abstract class AbstractAccountChangedReceiver implements AccountChangedReceiver {

@Autowired

LocalAccountRepository localAccountRepository;

@Override

public void receiveAccountChangedEvent() {

AccountChangedEvent event = receiveEvent();

handleEvent(event);

}

protected void handleEvent(AccountChangedEvent event) {

AccountMessage account = event.getAccountMessage();

String operation = event.getOperation();

operateAccount(account, operation);

}

private void operateAccount(AccountMessage accountMessage, String operation) {

System.out.print(

accountMessage.getId() + ":" + accountMessage.getAccountCode() + ":" + accountMessage.getAccountName());

LocalAccount localAccount = new LocalAccount(accountMessage.getId(), accountMessage.getAccountCode(),

accountMessage.getAccountName());

if (operation.equals("ADD") || operation.equals("UPDATE")) {

localAccountRepository.save(localAccount);

} else {

localAccountRepository.delete(localAccount);

}

}

protected abstract AccountChangedEvent receiveEvent();

}

Here, we implemented the receiveAccountChangedEvent method of the AccountChangedReceiver interface in the AbstractAccountChangedReceiver and defined an abstract method receiveEvent to receive AccountChangedEvent messages from different message brokers. Once the receiveAccountChangedEvent method retrieves a message, we will update the local database based on the Account object and the corresponding operation.

Next, let's take a look at the RabbitMQAccountChangedReceiver class, which also inherits from the AbstractAccountChangedReceiver abstract class, as shown in the following code:

@Component("rabbitMQAccountChangedReceiver")

public class RabbitMQAccountChangedReceiver extends AbstractAccountChangedReceiver {

@Autowired

private RabbitTemplate rabbitTemplate;

@Override

public AccountChangedEvent receiveEvent() {

return (AccountChangedEvent) rabbitTemplate.receiveAndConvert(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE);

}

@Override

@RabbitListener(queues = AccountChannels.SPRINGCSS_ACCOUNT_QUEUE)

public void handlerAccountChangedEvent(AccountChangedEvent event) {

super.handleEvent(event);

}

}

The above RabbitMQAccountChangedReceiver implements both the receiveEvent abstract method of AbstractAccountChangedReceiver and the handlerAccountChangedEvent method of the AccountChangedReceiver interface. The receiveEvent method is used to actively pull messages, while the handlerAccountChangedEvent method is used to receive pushed messages. We also added the @RabbitListener annotation to this method.

Next, let's take a look at the KafkaAccountChangedListener class, which also inherits the AbstractAccountChangedReceiver abstract class, as shown in the following code:

@Component

public class KafkaAccountChangedListener extends AbstractAccountChangedReceiver {

@Override

@KafkaListener(topics = AccountChannels.SPRINGCSS_ACCOUNT_TOPIC)

public void handlerAccountChangedEvent(AccountChangedEvent event) {

super.handleEvent(event);

}

@Override

protected AccountChangedEvent receiveEvent() {

return null;

}

}

We know that Kafka can only obtain messages through push, so it only implements the handlerAccountChangedEvent method, and the receiveEvent method is empty.

Summary and Preview

In this lesson, we learned about the RabbitMQ message broker and used the RabbitTemplate provided by Spring Boot to send and consume messages. At the same time, based on the integration methods of the three message brokers, we extracted their common points and abstracted the corresponding interfaces and abstract classes, and refactored the implementation process of the SpringCSS system.

In the next lesson, we will discuss the complete security solution provided by Spring.