18 Message Delivery How to Ensure That Messages Are Consumed Only Once

18 Message Delivery - How to Ensure that Messages Are Consumed Only Once #

Hello, I’m Tang Yang.

In the previous lesson, we added a message queue to our e-commerce system. We use it to smooth out peak write traffic, asynchronously process secondary business logic, and decouple different system modules. Since the business logic has been removed from synchronous code, we also need corresponding queue processing programs to handle messages and execute business logic. At this point, your system architecture looks like this:

img

This is a simplified architecture diagram. In reality, as the business logic becomes more complex, more external systems and services will be introduced to solve business problems. For example, we will introduce Elasticsearch to handle product and store search, and an audit system to automatically and manually review the products sold and user comments. You will increasingly use message queues to decouple from external systems and improve system performance.

For example, your e-commerce system needs to introduce a new feature: after a user purchases a certain quantity of products, your system will give the user a cash voucher as an incentive to encourage further purchases. Since the process of issuing vouchers should not be in the main purchase flow, you consider using a message queue for asynchronous processing. At this point, you discover a problem: If a message is lost during delivery, the user will complain about not receiving the voucher. On the other hand, if duplicate messages occur during delivery, your system will incur losses by issuing two vouchers.

So how can we ensure that the generated messages are always consumed and only consumed once? Although this problem may sound simple and easy to understand, it actually holds many secrets. In this lesson, I will delve deeper into this topic with you.

Why do messages get lost #

In order to ensure that messages are only consumed once, it is necessary to ensure that messages are not lost. So, where are the possibilities of message loss in the process from being written to the message queue to being consumed by the consumer? In fact, there are three main scenarios:

The process of messages being written from the producer to the message queue.

The storage scenario of messages in the message queue.

The process of messages being consumed by the consumer.

img

Next, I will analyze each scenario in detail, so that you can choose the appropriate solution for different scenarios to reduce message loss.

1. Message loss in the message production process #

In this link, there are mainly two situations.

First, the message producer is usually our business server, and the message queue is deployed on a separate server. Although the network between the two is an intranet, there may still be network jitter, and once jitter occurs, messages may be lost due to network errors.

For this situation, I suggest you use message retransmission: When you find a send timeout, you resend the message, but you can’t retransmit the message without restriction. Generally, if there is no failure in the message queue or the network connection to the message queue is interrupted, 2-3 retries are sufficient.

However, this solution may cause message duplication, resulting in duplicate consumption of the same message during consumption. For example, due to slow message processing in the message queue or network jitter during message production, although the message is eventually written to the message queue successfully, it times out at the producer. Retransmitting this message will result in duplicate messages. In the example above, it will show that you have received two cash red envelopes.

Is the message safe after it is sent to the message queue? Of course not, there is still a risk of message loss in the message queue.

2. Message loss in the message queue #

Taking Kafka as an example, messages in Kafka are stored in local disk, and in order to reduce random I/O on the disk during message storage, we generally write the messages to the operating system’s Page Cache first, and then flush them to the disk at an appropriate time.

For example, Kafka can be configured to flush to disk at a certain time interval or when a certain number of messages have been accumulated, which is called asynchronous disk flushing.

Here’s an analogy: If you manage a library and have to put books back in place every time a reader returns a book, it not only requires a lot of effort but is also inefficient. However, if you can choose to put the books back every 3 hours or when a certain number of books are reached, you can put books of the same type back together, saving time searching for book positions, and improving efficiency. However, if the machine experiences a power outage or abnormal restart, the messages that have not been flushed to disk in the Page Cache will be lost. So how do we solve this?

You may set a short interval for flushing to disk, or flush to disk after accumulating a single message. However, frequent flushing to disk can have a significant impact on performance, and from experience, the probability of machine crashes or power outages is not high. So I don’t recommend you to do this.

img

If your e-commerce system has a low tolerance for message loss, you can consider deploying Kafka in a cluster to backup data with multiple replicas, ensuring that messages are not lost as much as possible.

So how does it work?

In the Kafka cluster, there is a Leader responsible for writing and consuming messages, and there can be multiple Followers responsible for data backup. Within the Followers, there is a special collection called ISR (in-sync replicas). When the Leader fails, the newly elected Leader will select from the ISR by default. The data of the Leader is asynchronously copied to the Followers. In this way, when the Leader experiences a power outage or crashes, Kafka will consume messages from the Followers, reducing the possibility of message loss.

Since the default messages are asynchronously copied from the Leader to the Followers, any messages that have not been copied to the Followers when the Leader crashes will still be lost. To solve this problem, Kafka provides an option called “acks” for producers. When this option is set to “all”, each message sent by the producer will be sent to both the Leader and all the ISRs, and it must receive confirmation from the Leader and all ISRs to be considered successfully sent. This way, messages will only be lost if both the Leader and all ISRs fail.

img

From the above diagram, when “acks=all” is set, steps 1, 3, and 4 need to be performed synchronously, which also has a significant impact on message production performance. Therefore, you need to carefully weigh and consider it in your actual application. My suggestion for you is:

  1. If you need to ensure that no messages are lost, it is recommended not to enable synchronous flushing of the message queue, but to solve it using a cluster. You can configure it so that the message is only considered successful when all the ISR Followers have received it.
  2. If there is a certain tolerance for message loss, it is recommended not to deploy it in a cluster. Even if it is deployed in a cluster, it is recommended to configure it to send to only one Follower for success.
  3. Our business system generally has a certain tolerance for message loss. For example, taking the above red envelope system as an example, if the red envelope message is lost, we only need to compensate the users who have not received the red envelope later.

3. There is a possibility of message loss during consumption #

Let me use Kafka as an example. The progress of a consumer in consuming messages is recorded in the message queue cluster, and the consumption process can be divided into three steps: receiving messages, processing messages, and updating the consumption progress.

Both the receiving and processing processes may encounter exceptions or failures. For example, when receiving messages, if there is network jitter, the message may not be received correctly. When processing messages, there may be some business exceptions that result in the processing flow not being completed. If the consumption progress is updated at this time, then this failed message will never be processed, resulting in loss.

So, what you need to pay attention to here is that you must wait until the message is received and processed before updating the consumption progress. However, this can also cause message duplication. For example, if a consumer crashes immediately after processing a message, because the consumption progress is not updated, when this consumer restarts, it will consume the same message again.

How to Ensure Messages are Consumed Only Once #

From the analysis above, you can see that in order to avoid message loss, we need to pay two costs: performance degradation and the possibility of duplicate message consumption.

We can accept the performance degradation because in general, business systems only send messages to the message queue during write requests, and the volume of write requests in typical systems is not high. However, if a message is consumed multiple times, it can cause errors in business logic processing. So how can we avoid message duplication?

It is difficult to completely avoid message duplication because network jitter, machine crashes, and exceptional processing are hard to prevent, and there is no mature method in the industry to address this issue. Therefore, we relax our requirements and only need to ensure that even if duplicate messages are consumed, the final result of consumption is the same as if the messages were consumed only once. In other words, we need to ensure that the processing of messages is “idempotent” during the production and consumption process.

1. What is Idempotency #

Idempotency is a mathematical concept that means performing the same operation multiple times yields the same result as performing the operation only once. It might sound a bit abstract, so let me give you an example:

For example, imagine a couple having an argument, and the woman keeps repeating the phrase “Don’t you care about me anymore?” (producing the same message). What she doesn’t know is that the man’s ears (message handler) automatically filter out the repeated phrases, so he only hears it once. This is idempotency.

If, when consuming a message, we decrease the existing inventory quantity by 1, then consuming two identical messages would decrease the inventory quantity by 2, which is not idempotent. But if, when consuming a message, the processing logic is to set the inventory quantity to 0, or to subtract 1 if the current inventory quantity is 10, then consuming multiple messages would yield the same result. This is idempotency.

In simple terms, you can understand “idempotency” as: If doing something multiple times produces the same result as doing it once, then this thing is idempotent.

2. Ensuring Idempotency in the Production and Consumption Process #

Messages can be duplicated during both the production and consumption process. Therefore, what you need to do is ensure idempotency in both the production and consumption process, so that in terms of the “final result,” the message is actually consumed only once.

In the message production process, both Kafka 0.11 and Pulsar support the feature of “producer idempotency”, which ensures idempotency during the production process. This feature ensures that even if messages are produced duplicatedly on the producer side, only one copy will be stored in the message queue.

It works by giving each producer a unique ID and assigning a unique ID to each produced message. The message queue server stores the mapping of <Producer ID, Last Message ID>. When a producer generates a new message, the message queue server compares the message ID with the last stored ID. If they match, the server regards it as a duplicate message and discards it.

img

On the consumer side, ensuring idempotency is slightly more complex, and you can consider it from both the general level and the business level.

At the general level, when a message is produced, you can use a sequencer to generate a globally unique message ID for it. After the message is processed, store this ID in the database. Before processing the next message, query whether this global ID has been consumed from the database. If it has been consumed, then skip the consumption.

As you can see, both the producer-side idempotency guarantee and the general consumer-side idempotency guarantee have the common approach of generating a unique ID for each message and comparing this ID before using the message. Therefore, this approach is a standard implementation of idempotency that you can directly use in your project. Its pseudocode logic is like this:

boolean isIDExisted = selectByID(ID); // Check if ID exists

if(isIDExisted) {
  return; // If it exists, return directly
} else {
  process(message); // If it doesn't exist, process the message
  saveID(ID);   // Save the ID
}

However, there is a problem with this approach: If a message is consumed and the write to the database has not yet occurred, and then the consumer crashes and restarts, it will find that the message is not in the database, and will still execute the consumption logic twice. In this case, you need to introduce a transaction mechanism to ensure that both message processing and database writing must succeed or fail together. But this increases the cost of message processing. So, if there is no strict requirement for message duplication, you can directly use this general approach without considering introducing transactions.

How do we handle this at the business level? There are many approaches, and one of them is to add optimistic locking. For example, if your message processing program needs to add money to a person’s account, you can solve it using optimistic locking.

The specific operation is as follows: Add a version field to each person’s account data. When producing a message, first query the version number of the account and send it along with the message to the message queue. On the consumer side, after receiving the message and version number, update the account amount with the specified version number in the SQL statement, like this:

update user set amount = amount + 20, version=version+1 where userId=1 and version=1;

As you can see, we added optimistic locking to the data update. So when consuming the first message, the version value is 1 and the SQL statement can be executed successfully, and the version value is then changed to 2. When executing the second identical message, because the version value is no longer 1, the SQL statement cannot be executed successfully, thus ensuring the idempotency of the message.

Course Summary #

In this class, I mainly explained the possible scenarios in which messages may be lost in a message queue, and our countermeasures. I also discussed how to ensure that the handling of messages is not affected in the case of message duplication. The key points I want to emphasize are:

Message loss can be solved through three methods: retrying at the producer end, configuring the message queue in cluster mode, and handling the consumption progress at the consumer end.

Solving message loss usually leads to performance issues and message duplication.

Message duplication can be solved by ensuring the idempotence of message processing.

Although I have mentioned many methods to address message loss, it does not mean that message loss is always unacceptable. After all, as you can see, allowing message loss can improve the performance of the message queue and reduce the complexity of implementation. For example, in the scenario of log processing, the purpose of logs is to troubleshoot system issues, and the likelihood of system issues is low. Therefore, it is acceptable to occasionally lose a few logs.

Therefore, the design of the solution depends on the scenario. This is the principle of all designs. You cannot configure all message queues to prevent message loss, nor can you require all business logic to support idempotence. This would impose additional burdens on development and operations.