28 Learning Programming Patterns Based on Files from RocketMQ II #
Sync Flush, Async Flush #
In file-based programming models, in order to improve the performance of file writes, memory mapping mechanisms are usually introduced. However, everything has its pros and cons. With the introduction of memory mapping and page caching mechanisms, data is first written to the page cache without actually being persisted to disk. In this case, should the Broker return success to the client after storing the message sent by the client in the page cache, or only after persisting it to disk?
This is another choice, a trade-off between performance and message reliability. To address this, RocketMQ provides multiple persistence strategies: synchronous flush and asynchronous flush.
The term “flush to disk” may sound grand, but it’s not a mysterious or esoteric term. In fact, it simply means synchronizing data in memory to disk, which is accomplished by calling the force
method of FileChannel or MappedByteBuffer, as shown below:
Next, let’s discuss the implementation techniques for sync flush and async flush.
Sync Flush #
Sync flush refers to the scenario where, after receiving a message sending request from the message sender, the Broker writes it to memory first, and then persists the content to disk before returning successfully to the client.
Let’s think about this: does RocketMQ persist only one message to disk for each message write during sync flush? The answer is no.
In RocketMQ, the entry point for sync flush is the handleDiskFlush
method in the commitlog
, as shown below:
There are two key points here:
- The class responsible for handling sync flush is
GroupCommitService
. Have you noticed why the name contains the word “Group”? This is because it represents group commit, which means that more than one message is flushed to disk in one flush operation, instead of just one message. - A programming technique is used here: the design pattern of
CountDownLatch
with an expiration time. An asynchronous request is sent, and then theawait
method with an expiration time is called to wait for the result of the asynchronous processing. This achieves the synchronous-to-asynchronous programming model and decouples business logic.
Next, let’s continue to explore the design philosophy behind group commit.
The condition for determining the success of a flush request is that the current flush pointer is greater than the physical offset of the message. A flush retry mechanism is used here. Then it wakes up the main thread and returns the flush result.
The core idea of group commit is that the MappedFileQueue.flush
method is used for flushing, and it doesn’t flush only one message to disk, but flushes all the unflushed data to disk at once. Therefore, even in the case of sync flush, not every message will be flushed by the flush
method. To illustrate the design philosophy of group commit more intuitively, the following flowchart is provided:
Async Flush #
The advantage of sync flush is that it ensures that no messages are lost, meaning that returning success to the client guarantees that the message has been persisted to disk, making it very reliable. However, this is achieved at the expense of write performance. However, since RocketMQ messages are first written to the page cache, the possibility of message loss is smaller. If you can tolerate a certain probability of message loss but want to improve performance, you can consider using async flush.
Async flush means that after the Broker stores the message to the page cache, it immediately returns success and then starts an asynchronous thread to periodically call the force
method of FileChannel
to flush the data in memory to disk. The default interval is 500ms. In RocketMQ, the implementation class for async flush is FlushRealTimeService
. When you see the default interval of 500ms, can you guess whether FlushRealTimeService
uses a timer task?
Actually, no. Here, a CountDownLatch
await method with an expiration time is introduced. The benefit of this design is that if there are no new messages being written, it will sleep for 500ms. However, if a new message is received, it can be awakened to ensure timely flush of messages, rather than having to wait for exactly 500ms.
File Recovery Mechanism #
First, let’s take a look at RocketMQ’s file forwarding mechanism:
In RocketMQ, data is first written to the commitlog
file, and then the consumequeue
, indexFile
, and other files are asynchronously forwarded based on the commitlog
file. Since the forwarding is asynchronous, it’s possible that the data in the commitlog
file and consumequeue
file may be inconsistent. For example, when RocketMQ is shut down, some data may not be forwarded to the consumequeue
. In this case, how can we recover and ensure data consistency when restarting?
Before explaining RocketMQ’s file recovery mechanism, let’s present a few exception scenarios:
- After writing to the
commitlog
file and performing synchronous flush, that is, after the message has been written into thecommitlog
file, an exception occurs during forwarding to theconsumequeue
file due to a power outage or other abnormal conditions, resulting in the data not being stored successfully. - During flush, if 100MB of messages have accumulated and are ready to be flushed to disk, but a sudden power failure occurs after only 50MB is flushed to the
commitlog
file, then a message may only be partially written to disk. How should this be handled? - Observant readers may notice that in the RocketMQ storage directory, there is a file called
checkpoint
, which obviously records the flush point of thecommitlog
and other files. However, the flush point is written to thecheckpoint
file only after the data is flushed to thecommitlog
file. If the flush point is lost before being written to thecheckpoint
file, how should it be handled?
Friendly reminder: dear readers, I suggest you take a moment here to think about these issues before continuing with the rest of this article.
In RocketMQ, there are two scenarios for recovering from Broker abnormal shutdown and normal shutdown. These two scenarios mainly differ in the logic of determining from which file to start recovery. Once the file is determined, the recovery process for the file is as follows:
- First, try to recover the
consumeque
file. According to the storage format ofconsumequeue
(8-byte physical offset, 4-byte length, 8-byte tag hash code), locate the physical offset corresponding to the last complete message format and usemaxPhysicalOfConsumeque
to represent it. - Then try to recover the
commitlog
file. First, determine if the file is acommitlog
file based on its magic code. Then, search for the last qualified message according to the storage format of the messages to obtain its physical offset. If the effective offset in thecommitlog
file is smaller than the maximum physical offset stored in theconsumequeue
, the excess content in theconsumequeue
will be deleted. If it is greater than the maximum physical offset in theconsumequeue
, it means that the content in theconsumequeue
is less than the content stored in thecommitlog
file. RocketMQ will then re-forward the excess messages from thecommitlog
file to ensure consistency between thecommitlog
andconsumequeue
files.
In actual production environments, how can we efficiently locate the files that are most likely to be recovered? For example, if there are over 500 commitlog
files, should we start from the first file? Of course not. It depends on whether the RocketMQ process exited normally or abnormally.
Locating Files in Normal Exits #
When RocketMQ starts, it creates a file named abort
, and this file is deleted during a normal exit. To determine if the RocketMQ process exited abnormally, you only need to check if the abort
file exists.
The file locating strategy for a normal exit is as follows:
- When restoring the
consumequeue
, it is done based on the topic, starting from the first file. - When restoring the
commitlog
, it starts from the third last file and tries to recover from that point.
Locating Files in Abnormal Exits #
RocketMQ can recover from the third last file during a normal exit. This may seem risky, but it is not. In normal situations, once a file is written, it will be flushed to the disk. However, for an abnormal exit, the reason cannot be determined, so it is not possible to be as “casual.” In this case, it must be cautious. So, how can we improve the efficiency of locating files in a cautious manner?
You may have noticed the checkpoint
file in the previous image. It should be familiar to you. In RocketMQ, this file stores the last flush timestamp of the commitlog
, index
, consumequeue
, and other files. The file structure is illustrated in the following image:
physicMsgTimestamp
: the last flush timestamp of thecommitlog
filelogicsMsgTimestamp
: the last flush timestamp of theconsumequeue
fileindexMsgTimestamp
: the last flush timestamp of theindexfile
The flushing mechanism of this file is as follows:
From this, we can see that the commitlog
is flushed first, and then the checkpoint
file is flushed. There can be inconsistencies between the commitlog
and checkpoint
files, i.e., the data before the flush point stored in the checkpoint
file must have been written to the disk, but it does not mean that only the data represented by the flush point in the checkpoint
file has been flushed.
Based on the characteristics of the checkpoint
file, the file locating strategy for recovery after an abnormal exit is as follows:
- When restoring the
consumequeue
, it is done based on the topic, starting from the first file. - Starting from the last
commitlog
file, search backward. While searching, read the storage time of the first message in the file. If this storage time is earlier than the flush time in thecheckpoint
file, recovery can start from this file. If the storage time of the first message in this file is later than the flush point, it means recovery cannot start from this file, and the previous file should be found because the flush point in thecheckpoint
file represents 100% reliable messages.
I will not go into much detail about the code for file recovery. Based on the design concept described above, you can explore it on your own, and the effect should be more efficient. The entry point for file recovery is DefaultMessageStore#recover
.
How to Use Zero Copy in Java #
In the programming models of file and network operations, the term “zero copy” is frequently mentioned. Here, I won’t explain the specific meaning of zero copy in Java. If you’re not familiar with it, I suggest searching online for more information. Instead, let’s take a look at how RocketMQ uses zero copy based on Netty during message consumption.
The key implementation points of zero copy are as follows:
- In the message reading scenario, first acquire a
ByteBuf
based on memory mapping. The data in thisByteBuf
does not need to be loaded into heap memory. - Then wrap the
ByteBuf
to be sent in Netty’sFileRegion
and implement itstransferTo
method. The underlying implementation is thetransferTo
method ofFileChannel
.
The implementation of the transferTo
method in ManyMessageTransfer
is shown in the following image:
This is where I will end the discussion on file-based programming in this article. The design idea of this article is to learn excellent programming techniques from excellent people.