18 Buffer Buffer Area We Don’t Produce Data We Only Transport Data

18 Buffer Buffer Area We Don’t Produce Data We Only Transport Data #

Buffer is a byte container that is present in NIO frameworks such as Netty. For example, Java NIO has ByteBuffer and Netty4 has ByteBuf. Dubbo abstracts the Buffer design from underlying NIO frameworks with the ChannelBuffer interface, whose subclasses are shown in the following diagram:

Drawing 0.png

ChannelBuffer inheritance diagram

In the following sections, we will introduce the ChannelBuffer interface and its subclasses, starting from the top-level ChannelBuffer interface, and going through each subclass until the bottom-level implementations.

ChannelBuffer Interface #

The design of the ChannelBuffer interface is similar to that of the ByteBuf abstract class in Netty4. It also has the concepts of readerIndex and writerIndex, as shown below. The core methods are also very similar.

  • getBytes(), setBytes() methods: Read or write the current ChannelBuffer from the specified position, without modifying the readerIndex and writerIndex positions.
  • readBytes(), writeBytes() methods: Also read or write the current ChannelBuffer, but the readBytes() method reads data starting from the readerIndex position and moves the readerIndex pointer, while the writeBytes() method writes data starting from the writerIndex position and moves the writerIndex pointer.
  • markReaderIndex(), markWriterIndex() methods: Record the positions of the readerIndex and writerIndex pointers, and usually used together with resetReaderIndex(), resetWriterIndex() methods. The resetReaderIndex() method resets the readerIndex pointer to the position marked by markReaderIndex(), and the resetWriterIndex() method does the same for the writerIndex pointer.
  • capacity(), clear(), copy(), and other auxiliary methods are used to get the ChannelBuffer’s capacity, perform data clearance, and copy data. These methods are not further explained here.
  • factory() method: This method returns the factory object used to create ChannelBuffer objects. ChannelBufferFactory defines multiple overloaded getBuffer() methods to create ChannelBuffers. The following diagram shows the singleton implementations of these ChannelBufferFactory interfaces.

Drawing 1.png

ChannelBufferFactory inheritance diagram

The AbstractChannelBuffer abstract class implements most of the methods in the ChannelBuffer interface. Its core lies in maintaining the following four indexes.

  • readerIndex, writerIndex (int type): When data is read using the readBytes() method and its overloads, the readerIndex position is moved. When data is written using the writeBytes() method and its overloads, the writerIndex position is moved.
  • markedReaderIndex, markedWriterIndex (int type): These are used to record the positions of the readerIndex and writerIndex and provide the ability to roll back to the marked positions. We have already introduced the markReaderIndex(), resetReaderIndex() and markWriterIndex(), resetWriterIndex() methods. You can compare and learn from them.

The various overloads of the readBytes() and writeBytes() methods in AbstractChannelBuffer eventually use the getBytes() and setBytes() methods to read and write data. These methods are implemented in subclasses of AbstractChannelBuffer. The following code example demonstrates reading and writing a byte array:

public void readBytes(byte[] dst, int dstIndex, int length) {

    // Check if there are enough readable bytes

    checkReadableBytes(length);

    // Read length bytes from the readerIndex position in the current ChannelBuffer to the dst array starting from dstIndex to dstIndex+length

    getBytes(readerIndex, dst, dstIndex, length);

    // Move the readerIndex pointer forward by length bytes

    readerIndex += length;
}

public void writeBytes(byte[] src, int srcIndex, int length) {

    // Write data from the src array, starting from srcIndex to srcIndex+length, to the current buffer starting from the writerIndex position to writerIndex+length

    setBytes(writerIndex, src, srcIndex, length);

    // Move the writerIndex pointer forward by length bytes

    writerIndex += length;
}

Analysis of Buffer Implementations #

After understanding the core methods of the ChannelBuffer interface and the common implementations of AbstractChannelBuffer, let’s take a look at the specific implementations of ChannelBuffer.

HeapChannelBuffer is a ChannelBuffer implementation based on a byte array. We can see that it contains an array (byte array), which is where the HeapChannelBuffer stores the data. The implementation of the setBytes() and getBytes() methods in HeapChannelBuffer uses the System.arraycopy() method for array operations. The specific implementation is as follows:

public void setBytes(int index, byte[] src, int srcIndex, int length) {

    System.arraycopy(src, srcIndex, array, index, length);
}

public void getBytes(int index, byte[] dst, int dstIndex, int length) {

    System.arraycopy(array, index, dst, dstIndex, length);
}

The ChannelBufferFactory implementation corresponding to HeapChannelBuffer is HeapChannelBufferFactory. Its getBuffer() method creates a HeapChannelBuffer object with the specified capacity using the ChannelBuffers utility class. Here are two examples of overloaded getBuffer() methods:

@Override
public ChannelBuffer getBuffer(int capacity) {

    // Create a new HeapChannelBuffer with a byte array of length capacity

    return ChannelBuffers.buffer(capacity);
}

@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {

    // Create a new HeapChannelBuffer and copy the data in the array from offset to offset+length to the new HeapChannelBuffer

    return ChannelBuffers.wrappedBuffer(array, offset, length);
}

Other overloaded getBuffer() methods are not shown here. If you are interested, you can refer to the source code for further learning. DynamicChannelBuffer can be considered a decorator for other ChannelBuffers. It adds the functionality of dynamic capacity extension to other ChannelBuffers. DynamicChannelBuffer has two core fields:

  • buffer (ChannelBuffer type), which is the ChannelBuffer being decorated and is set to HeapChannelBuffer by default.
  • factory (ChannelBufferFactory type), which is used to create the ChannelBuffer objects for the underlying HeapChannelBuffer, and is set to HeapChannelBufferFactory by default.

The ensureWritableBytes() method in DynamicChannelBuffer is worth mentioning. This method implements the dynamic capacity extension feature. Before writing data, this method is called to determine if the available space is sufficient. The method is called at the following location:

Drawing 2.png

If the ensureWritableBytes() method detects that the space in the underlying ChannelBuffer object is insufficient, it creates a new ChannelBuffer (whose capacity is doubled compared to the original), copies the data from the original ChannelBuffer to the new one, and finally assigns the buffer field to the new ChannelBuffer, completing the entire capacity extension process. The specific implementation of the ensureWritableBytes() method is as follows:

public void ensureWritableBytes(int minWritableBytes) {

    if (minWritableBytes <= writableBytes()) {

        return;

    }

    int newCapacity;

    if (capacity() == 0) {
newCapacity = 1;
} else {
newCapacity = capacity();
}

int minNewCapacity = writerIndex() + minWritableBytes;

while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}

ChannelBuffer newBuffer = factory().getBuffer(newCapacity);

newBuffer.writeBytes(buffer, 0, writerIndex());

buffer = newBuffer;
}

The code you are seeing above is an implementation of the ensureWritableBytes() method. Its purpose is to ensure that the buffer has enough writable capacity to accommodate new data.

In the first part of the code, the current capacity of the buffer is checked. If it is less than the minimum required capacity (minNewCapacity), a new capacity is calculated by doubling the current capacity, until it is larger than minNewCapacity.

Then, a new buffer is created with the new capacity using the buffer factory. The data from the old buffer is copied to the new buffer using the writeBytes() method.

Finally, the old buffer is replaced with the new buffer.

public void getBytes(int index, byte[] dst, int dstIndex, int length) {

    ByteBuffer data = buffer.duplicate();

    try {

        data.limit(index + length).position(index);

    } catch (IllegalArgumentException e) {

        throw new IndexOutOfBoundsException();

    }

    data.get(dst, dstIndex, length);

}

public void setBytes(int index, byte[] src, int srcIndex, int length) {

    ByteBuffer data = buffer.duplicate();

    data.limit(index + length).position(index);

    data.put(src, srcIndex, length);

}

The code you are seeing above provides examples of the getBytes() and setBytes() methods implementation in the ByteBufferBackedChannelBuffer class.

In the getBytes() method, a duplicate of the internal ByteBuffer is created, and its position and limit are set to the specified range. Then, the data is read from the ByteBuffer using the get() method and written to the specified destination byte array.

In the setBytes() method, a duplicate of the internal ByteBuffer is created, and its position and limit are set to the specified range. Then, the data from the source byte array is written to the ByteBuffer using the put() method.

The remaining methods in ByteBufferBackedChannelBuffer are implemented in a similar manner, using the methods provided by the underlying ByteBuffer.

The NettyBackedChannelBuffer class, on the other hand, directly implements the ChannelBuffer interface using the Netty ByteBuf class. It does not extend the AbstractChannelBuffer abstract class like other implementations.

The class provides implementations for all the methods defined in the ChannelBuffer interface, which delegate to the corresponding methods in the underlying ByteBuf.

Finally, the ChannelBuffers class is a facade class that provides a set of helper methods for creating ChannelBuffer objects. These methods include creating dynamic buffers (dynamicBuffer()), heap buffers (buffer()), wrapped buffers (wrappedBuffer()), and direct buffers (directBuffer()). There are also methods for comparing (equals(), compare()) and copying (copy(), copySlice()) buffers.

This concludes the explanation of the code snippet provided. If you have any further questions or would like to know more, feel free to ask.