03 Message Sending API Detailed Explanation and Version Migration Description

03 Message Sending API - Detailed Explanation and Version Migration Description #

Before RocketMQ was donated to Apache and became a top-level project of Apache Foundation, its versions were 3.x. The version number starts from 4.0.0 after being donated to Apache. Since RocketMQ was open source even before becoming an Apache top-level project, the content of version 4.0.0 is already comprehensive. The evolution after 4.0.0 mainly focuses on providing new features such as message tracing, ACL, and multiple replicas. RocketMQ’s kernel is very stable, and there are minimal changes to the client. I have tested using RocketMQ-Client 4.0.0 to send and consume messages to/from RocketMQ server version 4.7.0 and everything worked normally.

Starting from this article, we will introduce the knowledge about message sending in RocketMQ. In RocketMQ, message senders and message consumers are collectively called clients, corresponding to RocketMQ’s Client module.

Therefore, when using RocketMQ for message sending, the following Maven dependency needs to be imported:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

Next, let’s introduce the API related to message sending in RocketMQ, and then demonstrate message sending with a simple example. In the next article, we will show the usage techniques of message sending API in combination with scenarios.

Detailed Explanation of Message Sending API #

The core class hierarchy of RocketMQ message senders is shown in the following diagram:

1

The explanations for the above class diagram are as follows.

MQAdmin

MQ’s basic management interface provides basic management capabilities for MQ. The method explanations are as follows:

void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

Create a topic, and the meanings of the parameters are as follows:

  • String key: Find the broker based on the key, i.e., where the new topic is created on which brokers.
  • String newTopic: Topic name.
  • int queueNum: Number of queues for the topic.
  • int topicSysFlag: System parameters of the topic.
long searchOffset(MessageQueue mq, long timestamp)

Find messages from the message consumption queue based on the queue and timestamp, and return the physical offset (offset in the commitlog file) of the message. The meanings of the parameter list are as follows:

  • MessageQueue mq: Message consumption queue.
  • long timestamp: Timestamp.
long maxOffset(final MessageQueue mq)

Query the current maximum logical offset of the message consumption queue, which is the offset in the consumequeue file.

long minOffset(final MessageQueue mq)

Query the current minimum logical offset of the message consumption queue.

long earliestMsgStoreTime(MessageQueue mq)

Return the storage timestamp of the first message in the message consumption queue.

MessageExt viewMessage(String offsetMsgId)

Find a message based on the physical offset of the message.

MessageExt viewMessage(String topic, String msgId)

Find a message based on the topic and the globally unique ID of the message.

QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)

Batch query messages. The parameter list is as follows:

  • String topic: Topic name.
  • String key: Message index key.
  • int maxNum: Maximum number of messages returned in this query.
  • long begin: Start timestamp.
  • long end: End timestamp.

MQProducer

Message sender interface. The core interface is explained as follows:

void start()

Start the message sender. This method must be called before sending messages.

void shutdown()

Shutdown the message sender. If the producer is no longer needed, this method should be called to release resources.

List<MessageQueue> fetchPublishMessageQueues(String topic)

Query all message consumption queues based on the topic.

SendResult send(Message msg, long timeout)

Synchronous message sending. The parameters are explained as follows:

  • Message msg: Message to be sent.
  • long timeout: Timeout. The default value is 3 seconds.
void send(Message msg, SendCallback sendCallback, long timeout)

Asynchronous message sending. The parameters are explained as follows:

  • Message msg: Message to be sent.
  • SendCallback sendCallback: Asynchronous send callback interface.
  • long timeout: Send timeout. The default value is 3 seconds.
void sendOneway(Message msg)

Oneway message sending mode. The characteristic of this mode is that the result of message sending is not cared about, regardless of success or failure.

SendResult send(Message msg, MessageQueue mq)

Send a message to a specified message queue. Overloaded methods represent synchronous, asynchronous, and oneway send modes.

SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)

Custom queue load balancing mechanism is used when sending messages, implemented by MessageQueueSelector. arg is the parameter passed to selector. The MessageQueueSelector is declared as follows, where arg is the third parameter of the select method.

2

Similarly, the overloaded method of this method supports asynchronous and Oneway modes.

TransactionSendResult sendMessageInTransaction(Message msg, Object arg)

Send transactional messages. Transactional messages can only be sent synchronously. arg is an additional parameter used in the transactional message callback interface.

3

This API was introduced in version 4.3.0.

SendResult send(Collection<Message> msgs, MessageQueue mq, long timeout)

Send messages to a specified message consumption queue in batches. Batch sending only supports synchronous mode.

SendResult send(Collection<Message> msgs, long timeout)

Send messages in batch, with a default timeout of 3s.

Message request(Message msg, long timeout)

In version 4.6.0, RocketMQ introduced the request-response model, where the message sender sends the message to the broker and waits for the consumer to finish processing before returning. The overloaded methods of request are the same as those of send, so they are not repeated here.

ClientConfig

Client configuration related. Here, we briefly introduce several core parameters, which will be explained in detail in the practical part.

  • String namesrvAddr: NameServer address
  • String clientIP: IP address of the client
  • String instanceName: Instance name of the client
  • String namespace: Namespace to which the client belongs

DefaultMQProducer

Default implementation class for message senders.

TransactionMQProducer

Default implementation class for transactional message senders.

Basic Usage Examples of Message Sending API #

After the above summary, we should have a fairly comprehensive understanding of the message sending API. Next, we will write some demo programs without the context of business scenarios to demonstrate the basic usage.

Friendly reminder: Most excellent open source frameworks currently provide corresponding unit tests or example codes, and RocketMQ is no exception. In the RocketMQ source code, there is an example module that provides a series of example codes provided by the official side, which is very suitable for beginners.

The example codes in the example package are shown in the following figure:

4

This article will first use the Producer class in the quickstart package to demonstrate a few APIs. In the next article, we will introduce the usage scenarios, methods, and common error points in the context of business scenarios.

The example code is as follows:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new 
            DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // Send a single message
Message msg = new Message("TOPIC_TEST", "hello rocketmq".getBytes());
SendResult sendResult = null;
sendResult = producer.send(msg);
// Output result
System.out.printf("%s%n", sendResult);
// Send message with key
msg = new Message("TOPIC_TEST", null, "ODS2020072615490001", "{\"id\":1, \"orderNo\":\"ODS2020072615490001\",\"buyerId\":1,\"sellerId\":1  }".getBytes());
sendResult = producer.send(msg);
// Output result
System.out.printf("%s%n", sendResult);
// Batch sending
List<Message> msgs = new ArrayList<>();
msgs.add( new Message("TOPIC_TEST", null, "ODS2020072615490002", "{\"id\":2, \"orderNo\":\"ODS2020072615490002\",\"buyerId\":1,\"sellerId\":3  }".getBytes()) );
msgs.add( new Message("TOPIC_TEST", null, "ODS2020072615490003", "{\"id\":4, \"orderNo\":\"ODS2020072615490003\",\"buyerId\":2,\"sellerId\":4  }".getBytes()) );
sendResult = producer.send(msgs);
System.out.printf("%s%n", sendResult);
// After use, shut down the message producer
producer.shutdown();
}
}

We can query the just sent messages from RocketMQ-Console.

5

With the explanation of the API, it is easy to understand how to use the API. I don’t think it is necessary to list each API in detail. Therefore, this article on the use of the API will introduce here. The next article will explain how to solve practical problems using the sending API in combination with business scenarios, as well as common error examples.

Evolution of Message Sending API Versions #

After the introduction of the message sending API, I would like to introduce several major version changes of the API, as well as briefly explain the purpose behind the introduction, so that everyone can understand the API more thoroughly and know the cause.

Introduction of Namespace Concept #

The namespace concept was officially introduced in RocketMQ 4.5.1. It is reflected in the construction of DefaultMQProducer in the API, as shown in the following figure:

6

Before version 4.5.1, the parameter list of the overloaded constructor of DefaultMQProducer did not include the namespace.

So the question is, after RocketMQ 4.5.1, when constructing DefaultMQProducer, do we need to pass this parameter or can we omit it? What is the purpose of this parameter?

Namespace, as the name suggests, is a naming space that groups message producers and message consumers together. In my understanding, the introduction of Namespace is somewhat similar to RocketMQ supporting multiple environments, multiple tags, and full-link pressure test scenarios.

Let me explain it using a full-link pressure test scenario. The so-called full-link pressure test scenario means that when the traffic of the request is in the test environment, I want to send the message to the shadow topic. If it is formal traffic, I will send it to the formal topic.

7

That is, when calling the send method of DefaultMQProducer to send a message to the topic TOPIC_TEST, the producer with the Namespace of “shadow” will send the message to the shadow_TEST_TOPIC. In this way, we only need to choose different message senders based on different contextual environment tags. For more information about the full-link pressure test context, please refer to my article:

Full-Link Pressure Test: Context Environment Management

Of course, the namespace of the message producer and consumer must be the same in order to collaborate with each other.

In short, the main purpose of Namespace is to group message producers and message consumers, and the underlying logic is to change the name of the topic.

request-response Response Model API #

RocketMQ introduced the request-response model in version 4.6.0, which means that the message sender sends the message to the broker and needs to wait for the consumer to process it before returning. The related APIs are as follows:

8

In my opinion, the usage scenario of this feature is relatively small, and I wonder if you have the same question as me: When a topic is subscribed by multiple consumer groups, do we need to wait for all subscribers to finish processing, or just one of them? What if all consumers are not online? Based on my experience, the introduction of a message middleware is to decouple the message sender and the message consumer. If this is the case, what advantages does it have compared to ordinary service calls?

Message Trace and ACL #

RocketMQ introduced message trace and ACL in version 4.4.0.

  • Message Trace: Supports tracing the entire process of message sending and message consumption, that is, tracing the sending IP, storage server, and which consumer consumed the message and when.
  • ACL: Access Control List, which can authorize message sending and subscription to topics, allowing only authorized users to send messages to specific topics.

The related API changes are as follows:

9

ACL relies on the RPCHook mechanism, so the API remains unchanged.

Conclusion #

This article provides a detailed introduction to the message sending class hierarchy and the parameter list of each API. Then, a demo program is built to demonstrate the API calls. Finally, the changes in RocketMQ’s message sending API from version 4.0 to 4.7 are summarized, as well as the background behind the API changes.

In the next article, we will combine practical scenarios to flexibly use the message sending API and analyze common problems.