27 Learning Programming Patterns Based on Files From Rocket Mq I

27 Learning Programming Patterns Based on Files from RocketMQ I #

File-based Programming with Message Storage Format #

Learning from the Design of Commitlog Files #

As we know, RocketMQ stores full messages in commitlog files, and the size of each message varies. So how are messages organized? After messages are written to files, how do we determine the start and end of each message?

Firstly, in file-based programming, we need to define a message storage format to represent a complete message. For example, the message storage format in RocketMQ is shown in the following diagram:

1

From here, we can derive a general practice for defining a data storage format: usually, the storage protocol follows Header + Body, and the Header section has a fixed length to store basic information, while the body stores the actual data. In RocketMQ’s message storage protocol, we can consider the four bytes representing the message body size as the Header, and the following fields as business attributes related to the message, which are assembled according to a specified format.

Based on the Header + Body protocol, extracting a message usually involves two steps. First, the Header is read into a ByteBuffer. In RocketMQ, for the message body, the length of the message is read, and then the bytes of the message with that length are read from the beginning of the message, and each part is parsed according to the predefined format.

But here comes a problem: how do we determine the start of a message? Do we have to start searching from the beginning of the file?

Just as a relational database introduces an ID field for each data record, in the file-based programming model, an identity label is also introduced for each message, which is the physical offset of the message, i.e., the starting position of the message stored in the file.

The design of physical offsets is shown in the following diagram:

2

With the starting offset of a file + SIZE, it becomes easy to extract a complete message from a file.

From the organization of commitlog files, we can see that in file-based programming, a magic number is usually filled before each file, and a padding number is designed at the end of the file for filling. For example, if a file cannot accommodate a complete message, the message will not be stored in multiple parts. Instead, it will be padded with PAD.

File Storage Design from the Perspective of Consumequeue #

Using the physical offset to locate a message in the commitlog file is very convenient, but it’s not that convenient to query messages based on topics. Therefore, to facilitate message querying by topic, a consumequeue file is introduced.

3

The design of consumequeue file is highly technical. Each entry uses a fixed length (8 bytes for commitlog physical offset, 4 bytes for message length, and 8 bytes for tag hashcode). Here, the tag hashcode is stored instead of the original tag string to ensure a fixed length for each entry, which allows fast positioning of the entries using a similar way of accessing array indexes, greatly improving the read performance of the ConsumeQueue file.

Therefore, when designing file storage, it is necessary to design some indexes that ensure fixed entry lengths, so that fast data positioning can be achieved using a similar way of accessing arrays.

Memory Mapping and Page Cache #

After solving the data storage format and unique identification, the next step is to consider how to improve the performance of writing data. In the file-based programming model, to facilitate data deletion, small files are usually adopted, with a fixed length. For example, in RocketMQ, the commitlog folder generates many files of equal size.

4

Using files with a fixed length is mainly for facilitating memory mapping. Through memory mapping, disk files are mapped to memory, allowing disk access in a way similar to accessing memory, greatly improving the performance of file operations.

Here is an example of using memory mapping in Java:

// Code example
FileChannel fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

The implementation points are as follows:

  • First, it is necessary to construct a file writing channel FileChannel using RandomAccessFile, which provides a channel for block-based writing.
  • Use the map method of FileChannel to create a memory-mapped buffer.

In the Linux operating system, MappedByteBuffer can be considered as page cache. In the memory usage strategy of the Linux operating system, it will maximize the use of the machine’s physical memory and keep it resident in memory, which is called page cache. Only when the operating system’s memory is not enough, will it use cache replacement algorithms such as LRU to evict infrequently used page caches. Therefore, the operating system will automatically manage this part of the memory, so there is no need for the user to worry about it. If a cache miss occurs when querying data from the page cache, a page fault interrupt will be generated, and the operating system will automatically load the content from the file to the page cache.

Memory mapping maps disk data to memory. By writing data to the memory mapping, the data is not immediately synchronized to the disk. It needs to be flushed to the disk by timing or let the operating system determine when to persist the data. Therefore, the data stored in the page cache, if the RocketMQ Broker process exits abnormally, the data stored in the page cache will not be lost. The operating system will periodically persist the data in the page cache to the disk, ensuring its safety and reliability. However, if there is an abnormal situation such as a power failure, the data stored in the page cache may be lost.

Sequential Writing #

Another design principle to improve the write performance of disk-based reading and writing is sequential writing. Sequential writing is widely used in file-based storage models. Let’s think about the purpose of introducing the MySQL Redo log. We know that in MySQL’s InnoDB storage engine, there is an InnoDB buffer pool used to cache disk file blocks. When an update statement modifies data, it is first modified in memory, and then the changes are written to the redo log file (a crucial step is to execute force to flush and synchronize the data to the disk. The data file is not synchronized at this time), but the data file is not synchronized. The operation process is shown in the following diagram:

5

If redo log is not introduced, when updating order and user, the InnoDB buffer pool (memory) will be updated first, and then the changes will be flushed to the disk periodically. Since the data files corresponding to different tables are not consistent, if the data in memory is flushed to the disk every time it is updated, there will be a large number of random disk writes, resulting in low performance. In order to avoid this problem, a sequential write redo log is introduced first, and then the data in memory is synchronized to the data file periodically. Although an additional redo sequential write is introduced, the overall performance is better. From this, we can also see that sequential writing has much better performance than random writing.

Therefore, in the file-based programming model, it must be designed as sequential writing, which means pursuit without update.

Reference Counter #

In NIO-based file-oriented programming, it is generally programmed based on ByteBuffer, and when performing read operations on ByteBuffer, the slice method is usually used. Two ByteBuffer objects have the same memory address, but different pointers. The usage example is as follows:

6

The above method is used to read a specified length of data from a mapped file, such as commitlog or ConsumeQueue file, by slicing a MappedBytebuffer from memory mapping. The two ByteBuffer objects share the same internal storage but maintain independent pointers. The advantage of doing this is to avoid copying the data in memory, but the downside is that management becomes more difficult, especially the release of the ByteBuffer object becomes complicated.

It is necessary to keep track of how many times the MappedByteBuffer will be sliced. The MappedByteBuffer cannot be closed arbitrarily until the lifecycle of these objects ends, otherwise the memory of other objects cannot be accessed, resulting in unpredictable errors. How does RocketMQ solve this problem?

The solution is to introduce a reference counter. That is, each time a slice is performed, the reference counter is incremented by one, which means the reference count of the ByteBuffer is increased. The MappedFile.selectMappedBuffer method in RocketMQ implements reference counting as follows:

7

Combining with the MappedFile.selectMappedBuffer method, we will explain its implementation points:

  1. When slicing the MappedByteBuffer, the hold method is called to increase the reference count by one, which means increasing the reference count of the ByteBuffer that references it.
  2. After using the returned ByteBuffer, it is wrapped in SelectMappedBufferResult. When the user is finished using it, it will be released. At this time, the release method of ReferenceResource will be called and the reference count will be decremented.