15 Message Driven How to Integrate Active Mq With Jms Template

15 Message-Driven - How to Integrate ActiveMQ with JmsTemplate #

In Lecture 14, we introduced how to send and consume messages using Kafka and KafkaTemplate, and refactored the account-service and customer-service in the SpringCSS sample system. Today, we will continue with ActiveMQ and add the corresponding message communication mechanism to the SpringCSS sample using the JmsTemplate template class.

JMS Specification and ActiveMQ #

JMS (Java Messaging Service) is a Java message service that provides a set of abstracted common APIs based on the message delivery semantics. Currently, there are several implementations of the JMS specification in the industry, with ActiveMQ being one of the representative ones.

JMS Specification #

The JMS specification provides a set of core interfaces for developers to use, which form the API system of the client, as shown in the diagram below:

Image.png

Core APIs in the JMS specification

From the above diagram, we can see that we can create a Connection through ConnectionFactory, and the MessageProducer and MessageConsumer, as the producer and consumer of the client, interact with the server through the Session provided by Connection. The medium of interaction is various messages that are encapsulated and contain the destination address (Destination).

A JMS message consists of two main parts: the message header and the message body.

The message body only contains the specific business data, while the message header contains the common attributes defined in the JMS specification, such as the unique identifier of the message (MessageId), the destination address (Destination), the receiving time of the message (Timestamp), the expiration time (Expiration), the priority (Priority), the persistence mode (DeliveryMode), etc. These common attributes constitute the basic metadata of message communication and are set by default by the message communication system.

In the JMS specification, the point-to-point model is represented by a queue, which provides a mechanism for one-to-one sequential sending and consumption of messages. In addition to the common API, the point-to-point model API specifically distinguishes the producer, QueueSender, and the consumer, QueueReceiver.

On the other hand, a Topic is an abstraction of the publish-subscribe model in the JMS specification. JMS also provides dedicated TopicPublisher and TopicSubscriber.

For a Topic, since multiple consumers may consume a message at the same time, there is the concept of message replication. Compared to the point-to-point model, the publish-subscribe model is generally used in non-reactive request scenarios such as updates, events, and notifications. In these scenarios, the consumer and producer are transparent, and the consumer can be statically managed through configuration files or dynamically created during runtime. Unsubscribing is also supported.

ActiveMQ #

There are various third-party implementation methods of the JMS specification, such as ActiveMQ, WMQ, and TIBCO, among which ActiveMQ is one of the mainstream ones.

For ActiveMQ, there are currently two implementation projects to choose from: the classic version 5.x and the next-generation Artemis. Regarding the relationship between the two, we can simply consider Artemis as the future version of ActiveMQ, representing the development trend of ActiveMQ. Therefore, in this course, we will use Artemis to demonstrate the message communication mechanism.

If we want to start the Artemis service, we first need to create a service instance by running the following command:

artemis.cmd create D:\artemis --user springcss --password springcss_password

Then, by executing the following command, we can start this Artemis service instance normally:

D:\artemis \bin\artemis run

Spring provides friendly integration with the JMS specification and various implementations. By directly configuring a Queue or Topic, we can simplify the operations on Artemis using the methods provided by JmsTemplate.

Integrating ActiveMQ with JmsTemplate #

To use JmsTemplate based on Artemis, we need to add the dependency on spring-boot-starter-artemis to the Spring Boot application, as shown in the following code:

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-artemis</artifactId>

</dependency>

Before discussing how to use JmsTemplate to send and consume messages, let’s first analyze the working modes of the message producer and consumer.

Usually, the behavior pattern of producers is single, while there are some specific classifications for consumers depending on the way they consume messages. Common classifications include push consumers and pull consumers.

Push mode refers to the application system registering a Listener interface with the consumer object and consuming messages through the callback methods of the Listener interface. In pull mode, the application system usually actively calls the consumer’s method to pull messages for consumption, and the control is held by the application system.

In the two basic models of message communication, the publish-subscribe model supports a one-to-many relationship between producers and consumers, and it is a typical implementation mechanism of push consumers. On the other hand, the point-to-point model has only one consumer, and they mainly consume messages through polling based on periodic pulling.

In Lecture 14, we mentioned that the way to consume messages in Kafka is a typical implementation of push consumers, so KafkaTemplate only provides methods to send messages and does not provide methods to consume messages. However, it is different for JmsTemplate. It supports both push consumers and pull consumers. Now let’s see how to send messages using JmsTemplate.

Sending messages using JmsTemplate #

JmsTemplate provides a set of send methods for sending messages, as shown in the following code:

@Override

public void send(MessageCreator messageCreator) throws JmsException {

}

 

@Override

public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {

}

 

@Override

public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {

}

On one hand, these methods specify the target Destination, and on the other hand, they provide a MessageCreator interface for creating message objects, as shown in the following code:

public interface MessageCreator {

    Message createMessage(Session session) throws JMSException;

}

A typical implementation of sending messages through the send method is shown in the following code:

public void sendDemoObject(DemoObject demoObject) { 

    jmsTemplate.send("demo.queue", new MessageCreator() { 

        @Override 

        public Message createMessage(Session session) 

        throws JMSException {
return session.createObjectMessage(demoObject);

Different from KafkaTemplate, JmsTemplate also provides a set of more convenient methods for message sending, namely convertAndSend method, as shown in the following code:

public void convertAndSend(Destination destination, final Object message) throws JmsException {

}

With the convertAndSend method, we can directly pass any business object, and this method can automatically convert the business object into a message object and send the message. An example of this method is shown in the following code:

public void sendDemoObject(DemoObject demoObject) {

    jmsTemplate.convertAndSend("demo.queue", demoObject);

}

In the above code, we notice that the convertAndSend method also has a batch of overloaded methods, which include message post-processing functionality, as shown in the following code:

@Override
public void convertAndSend(Destination destination, final Object message, final MessagePostProcessor postProcessor) throws JmsException {

}

The MessagePostProcessor in the above method is a type of message post-processor, which is used to add custom message properties during the message construction process. One typical use of it is shown in the following code:

public void sendDemoObject(DemoObject demoObject) {

    jmsTemplate.convertAndSend("demo.queue", demoObject, new MessagePostProcessor() {

        @Override
        public Message postProcessMessage(Message message) throws JMSException {
            // Message processing
            return message;
        }

    });
}

The last step of using JmsTemplate is to add configuration items in the configuration file, as shown in the following code:

spring:
  artemis:
    host: localhost
    port: 61616
    user: springcss
    password: springcss_password

Here, we specify the address, port, username, and password of the Artemis server. At the same time, we can also specify the Destination information in the configuration file. The specific configuration method is shown in the following code:

spring:
  jms:
    template:
      default-destination: springcss.account.queue

Consuming Messages with JmsTemplate #

Based on the previous discussion, we know that JmsTemplate supports both push-type and pull-type consumption modes. Let’s first look at how to implement the pull-type consumption mode.

JmsTemplate provides a set of receive methods for pulling messages from Artemis, as shown in the following code:

public Message receive() throws JmsException {

}

public Message receive(Destination destination) throws JmsException {

}

public Message receive(String destinationName) throws JmsException {

}

At this point, we need to pay attention to one point: when calling the above methods, the current thread will be blocked until a new message arrives. For blocking scenarios, the usage of the receive method is as shown in the following code:

public DemoEvent receiveEvent() {

    Message message = jmsTemplate.receive("demo.queue");
    return (DemoEvent) messageConverter.fromMessage(message);

}

Here, we use a messageConverter object to convert the message into a domain object. When using JmsTemplate, we can use the MappingJackson2MessageConverter, MarshallingMessageConverter, MessagingMessageConverter, and SimpleMessageConverter provided by Spring to perform message conversion. By default, the system uses SimpleMessageConverter. In daily development, we usually use MappingJackson2MessageConverter to convert between JSON strings and objects.

At the same time, JmsTemplate also provides a set of more advanced receiveAndConvert methods to receive and convert messages. The code is as follows:

public Object receiveAndConvert(Destination destination) throws JmsException {

}

As the name suggests, the receiveAndConvert method can automatically convert the received message object, making the code for receiving messages simpler. The code is as follows:

public DemoEvent receiveEvent() { 

    return (DemoEvent)jmsTemplate.receiveAndConvert("demo.queue"); 

}

Of course, on the consumer side, we also need to specify the same MessageConverter and Destination as the sender to implement message conversion and set the message destination respectively.

After introducing the pull model, let’s introduce the message consumption method under the push model. The implementation method is also simple, as shown in the following code:

@JmsListener(queues = "demo.queue")
public void handlerEvent(DemoEvent event) {
    //TODO: Add message processing logic
}

Under the push model, developers only need to specify the target queue in the @JmsListener annotation to automatically receive messages from that queue.

Integrating ActiveMQ in the SpringCSS Example #

ActiveMQ is the second message middleware used in this column. Because each message middleware needs to set some configuration information, it is necessary for us to return to the SpringCSS example system and optimize the management of configuration information.

Implementing the Account Service Message Producer #

First, let’s review the content of “Multi-dimensional Configuration: How to Use the Configuration System in Spring Boot?”. In Spring Boot, we can use profiles to effectively manage configuration information for different scenarios and environments.

In the SpringCSS example, Kafka, ActiveMQ, and RabbitMQ, which will be introduced in Lecture 16, are all message middlewares. During the runtime of the example system, we need to choose one of the middlewares to demonstrate the process of sending and receiving messages. Therefore, we need to set different profiles for different middlewares.

In the account-service, we can build the following configuration file system based on profiles.

Drawing 1.png

Configuration files in the account-service

From the above diagram, we can see that we provide three configuration files for three different middlewares. Taking application-activemq.yml as an example, it contains the following configuration items:

spring:
  jms:
    template:
      default-destination: springcss.account.queue
  artemis:
    host: localhost
    port: 61616
    user: springcss
    password: springcss_password
    embedded:
      enabled: false

In the main configuration file application.yml, we can set the currently available profile to activemq, as shown in the following code:

spring:
  profiles:
    active: activemq

After introducing the optimized management solution for configuration information, let’s take a look at the ActiveMQAccountChangedPublisher class, which implements message sending. The code is as follows:

@Component("activeMQAccountChangedPublisher")
public class ActiveMQAccountChangedPublisher{
 

    @Autowired
    private JmsTemplate jmsTemplate;
 

    @Override
    protected void publishEvent(AccountChangedEvent event) {
        jmsTemplate.convertAndSend(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, event, this::addEventSource);
    }
 

    private Message addEventSource(Message message) throws JMSException {
        message.setStringProperty("EVENT_SYSTEM", "SpringCSS");
        return message;
    }
}

In the code above, we use the convertAndSend method of JmsTemplate to send the message. We also notice that a lambda expression is used here as another way to implement MessagePostProcessor, which can simplify the code organization.

On the other hand, in this case, we want to use MappingJackson2MessageConverter to convert the message. Therefore, we can add a ActiveMQMessagingConfig in the account-service to initialize the specific MappingJackson2MessageConverter object, as shown in the following code:

@Configuration
public class ActiveMQMessagingConfig {
 
    @Bean
    public MappingJackson2MessageConverter activeMQMessageConverter() {
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setTypeIdPropertyName("_typeId");
 
        Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
        typeIdMappings.put("accountChangedEvent", AccountChangedEvent.class);
        messageConverter.setTypeIdMappings(typeIdMappings);
 
        return messageConverter;
    }
}

The core function of the above code is to define a typeId to Class map. This is done to provide flexibility for message conversion. For example, in the account-service, we can send a business object with an ID of “accountChangedEvent” and the type of AccountChangedEvent. In the scenario where the message is consumed, we only need to specify the same ID, and the corresponding message can be automatically converted to another type of business object (not necessarily AccountChangedEvent).

Implementing the message consumer in customer-service #

Let’s go back to the customer-service and see how to consume messages from account-service. The code is shown below:

@Component("activeMQAccountChangedReceiver")
public class ActiveMQAccountChangedReceiver {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    protected AccountChangedEvent receiveEvent() {
        return (AccountChangedEvent) jmsTemplate.receiveAndConvert(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE);
    }
}

Here, we simply use the receiveAndConvert method of JmsTemplate to pull messages from ActiveMQ.

Please note that because the execution process of the receiveAndConvert method is a blocking pull behavior, we can implement a new Controller specifically to test the effectiveness of this method, as shown in the following code:

@RestController
@RequestMapping(value="messagereceive")
public class MessageReceiveController {

    @Autowired
    private ActiveMQAccountChangedReceiver accountChangedReceiver; 

    @RequestMapping(value = "", method = RequestMethod.GET)
    public void receiveAccountChangedEvent() {
        accountChangedReceiver.receiveAccountChangedEvent();
    }
}

Once we access this endpoint, the system will pull messages that have not yet been consumed from ActiveMQ. If ActiveMQ does not have any pending messages to be consumed, this method will block and wait until new messages arrive.

If you want to consume messages using the message push method, the implementation process is even simpler, as shown in the following code:

@Override
@JmsListener(destination = AccountChannels.SPRINGCSS_ACCOUNT_QUEUE)
public void handlerAccountChangedEvent(AccountChangedEvent event) { 
    AccountMessage account = event.getAccountMessage();
    String operation = event.getOperation(); 

    System.out.print(accountMessage.getId() + ":" + accountMessage.getAccountCode() + ":" + accountMessage.getAccountName());
}

From the code above, we can see that we can directly consume messages pushed from ActiveMQ using the @JmsListener annotation. Here we just print out the message, but you can process the message in any desired way.

Summary and Next Lesson #

In this lesson, we continue to introduce ActiveMQ, a message middleware based on the JMS specification, and use the JmsTemplate provided by Spring Boot to send and consume messages. Similarly, we integrate the usage of ActiveMQ into the SpringCSS case and optimize the configuration management process based on Spring Boot’s configuration system. In the next lesson, we will continue to introduce RabbitMQ, another message middleware to be introduced in this course, as well as the RabbitTemplate template tool class provided by Spring Boot.