28 Learning Programming Patterns Based on Files From Rocket Mq Ii

28 Learning Programming Patterns Based on Files from RocketMQ II #

Sync Flush, Async Flush #

In file-based programming models, in order to improve the performance of file writes, memory mapping mechanisms are usually introduced. However, everything has its pros and cons. With the introduction of memory mapping and page caching mechanisms, data is first written to the page cache without actually being persisted to disk. In this case, should the Broker return success to the client after storing the message sent by the client in the page cache, or only after persisting it to disk?

This is another choice, a trade-off between performance and message reliability. To address this, RocketMQ provides multiple persistence strategies: synchronous flush and asynchronous flush.

The term “flush to disk” may sound grand, but it’s not a mysterious or esoteric term. In fact, it simply means synchronizing data in memory to disk, which is accomplished by calling the force method of FileChannel or MappedByteBuffer, as shown below:

1

Next, let’s discuss the implementation techniques for sync flush and async flush.

Sync Flush #

Sync flush refers to the scenario where, after receiving a message sending request from the message sender, the Broker writes it to memory first, and then persists the content to disk before returning successfully to the client.

Let’s think about this: does RocketMQ persist only one message to disk for each message write during sync flush? The answer is no.

In RocketMQ, the entry point for sync flush is the handleDiskFlush method in the commitlog, as shown below:

2

There are two key points here:

  • The class responsible for handling sync flush is GroupCommitService. Have you noticed why the name contains the word “Group”? This is because it represents group commit, which means that more than one message is flushed to disk in one flush operation, instead of just one message.
  • A programming technique is used here: the design pattern of CountDownLatch with an expiration time. An asynchronous request is sent, and then the await method with an expiration time is called to wait for the result of the asynchronous processing. This achieves the synchronous-to-asynchronous programming model and decouples business logic.

Next, let’s continue to explore the design philosophy behind group commit.

3

The condition for determining the success of a flush request is that the current flush pointer is greater than the physical offset of the message. A flush retry mechanism is used here. Then it wakes up the main thread and returns the flush result.

The core idea of group commit is that the MappedFileQueue.flush method is used for flushing, and it doesn’t flush only one message to disk, but flushes all the unflushed data to disk at once. Therefore, even in the case of sync flush, not every message will be flushed by the flush method. To illustrate the design philosophy of group commit more intuitively, the following flowchart is provided:

4

Async Flush #

The advantage of sync flush is that it ensures that no messages are lost, meaning that returning success to the client guarantees that the message has been persisted to disk, making it very reliable. However, this is achieved at the expense of write performance. However, since RocketMQ messages are first written to the page cache, the possibility of message loss is smaller. If you can tolerate a certain probability of message loss but want to improve performance, you can consider using async flush.

Async flush means that after the Broker stores the message to the page cache, it immediately returns success and then starts an asynchronous thread to periodically call the force method of FileChannel to flush the data in memory to disk. The default interval is 500ms. In RocketMQ, the implementation class for async flush is FlushRealTimeService. When you see the default interval of 500ms, can you guess whether FlushRealTimeService uses a timer task?

Actually, no. Here, a CountDownLatch await method with an expiration time is introduced. The benefit of this design is that if there are no new messages being written, it will sleep for 500ms. However, if a new message is received, it can be awakened to ensure timely flush of messages, rather than having to wait for exactly 500ms.

File Recovery Mechanism #

First, let’s take a look at RocketMQ’s file forwarding mechanism:

5

In RocketMQ, data is first written to the commitlog file, and then the consumequeue, indexFile, and other files are asynchronously forwarded based on the commitlog file. Since the forwarding is asynchronous, it’s possible that the data in the commitlog file and consumequeue file may be inconsistent. For example, when RocketMQ is shut down, some data may not be forwarded to the consumequeue. In this case, how can we recover and ensure data consistency when restarting?

Before explaining RocketMQ’s file recovery mechanism, let’s present a few exception scenarios:

  • After writing to the commitlog file and performing synchronous flush, that is, after the message has been written into the commitlog file, an exception occurs during forwarding to the consumequeue file due to a power outage or other abnormal conditions, resulting in the data not being stored successfully.
  • During flush, if 100MB of messages have accumulated and are ready to be flushed to disk, but a sudden power failure occurs after only 50MB is flushed to the commitlog file, then a message may only be partially written to disk. How should this be handled?
  • Observant readers may notice that in the RocketMQ storage directory, there is a file called checkpoint, which obviously records the flush point of the commitlog and other files. However, the flush point is written to the checkpoint file only after the data is flushed to the commitlog file. If the flush point is lost before being written to the checkpoint file, how should it be handled?

Friendly reminder: dear readers, I suggest you take a moment here to think about these issues before continuing with the rest of this article.

In RocketMQ, there are two scenarios for recovering from Broker abnormal shutdown and normal shutdown. These two scenarios mainly differ in the logic of determining from which file to start recovery. Once the file is determined, the recovery process for the file is as follows:

  • First, try to recover the consumeque file. According to the storage format of consumequeue (8-byte physical offset, 4-byte length, 8-byte tag hash code), locate the physical offset corresponding to the last complete message format and use maxPhysicalOfConsumeque to represent it.
  • Then try to recover the commitlog file. First, determine if the file is a commitlog file based on its magic code. Then, search for the last qualified message according to the storage format of the messages to obtain its physical offset. If the effective offset in the commitlog file is smaller than the maximum physical offset stored in the consumequeue, the excess content in the consumequeue will be deleted. If it is greater than the maximum physical offset in the consumequeue, it means that the content in the consumequeue is less than the content stored in the commitlog file. RocketMQ will then re-forward the excess messages from the commitlog file to ensure consistency between the commitlog and consumequeue files.

In actual production environments, how can we efficiently locate the files that are most likely to be recovered? For example, if there are over 500 commitlog files, should we start from the first file? Of course not. It depends on whether the RocketMQ process exited normally or abnormally.

Locating Files in Normal Exits #

When RocketMQ starts, it creates a file named abort, and this file is deleted during a normal exit. To determine if the RocketMQ process exited abnormally, you only need to check if the abort file exists.

6

The file locating strategy for a normal exit is as follows:

  • When restoring the consumequeue, it is done based on the topic, starting from the first file.
  • When restoring the commitlog, it starts from the third last file and tries to recover from that point.

Locating Files in Abnormal Exits #

RocketMQ can recover from the third last file during a normal exit. This may seem risky, but it is not. In normal situations, once a file is written, it will be flushed to the disk. However, for an abnormal exit, the reason cannot be determined, so it is not possible to be as “casual.” In this case, it must be cautious. So, how can we improve the efficiency of locating files in a cautious manner?

You may have noticed the checkpoint file in the previous image. It should be familiar to you. In RocketMQ, this file stores the last flush timestamp of the commitlog, index, consumequeue, and other files. The file structure is illustrated in the following image:

7

  • physicMsgTimestamp: the last flush timestamp of the commitlog file
  • logicsMsgTimestamp: the last flush timestamp of the consumequeue file
  • indexMsgTimestamp: the last flush timestamp of the indexfile

The flushing mechanism of this file is as follows:

8

From this, we can see that the commitlog is flushed first, and then the checkpoint file is flushed. There can be inconsistencies between the commitlog and checkpoint files, i.e., the data before the flush point stored in the checkpoint file must have been written to the disk, but it does not mean that only the data represented by the flush point in the checkpoint file has been flushed.

Based on the characteristics of the checkpoint file, the file locating strategy for recovery after an abnormal exit is as follows:

  • When restoring the consumequeue, it is done based on the topic, starting from the first file.
  • Starting from the last commitlog file, search backward. While searching, read the storage time of the first message in the file. If this storage time is earlier than the flush time in the checkpoint file, recovery can start from this file. If the storage time of the first message in this file is later than the flush point, it means recovery cannot start from this file, and the previous file should be found because the flush point in the checkpoint file represents 100% reliable messages.

I will not go into much detail about the code for file recovery. Based on the design concept described above, you can explore it on your own, and the effect should be more efficient. The entry point for file recovery is DefaultMessageStore#recover.

How to Use Zero Copy in Java #

In the programming models of file and network operations, the term “zero copy” is frequently mentioned. Here, I won’t explain the specific meaning of zero copy in Java. If you’re not familiar with it, I suggest searching online for more information. Instead, let’s take a look at how RocketMQ uses zero copy based on Netty during message consumption.

9

The key implementation points of zero copy are as follows:

  • In the message reading scenario, first acquire a ByteBuf based on memory mapping. The data in this ByteBuf does not need to be loaded into heap memory.
  • Then wrap the ByteBuf to be sent in Netty’s FileRegion and implement its transferTo method. The underlying implementation is the transferTo method of FileChannel.

The implementation of the transferTo method in ManyMessageTransfer is shown in the following image:

10

This is where I will end the discussion on file-based programming in this article. The design idea of this article is to learn excellent programming techniques from excellent people.