11 Simple Rpc Framework Implementation Above

11 Simple RPC Framework Implementation Above #

This is the last lesson in the “Fundamentals” section, where we will apply the knowledge we have learned to build a practical project - a simple version of an RPC framework. This project serves as a summary and review of the “Fundamentals” section.

RPC stands for Remote Procedure Call, and it can be explained as calling remote services just like local method calls. Although the definition of RPC is simple, a complete and generic RPC framework involves many aspects, such as service registration and discovery, service governance, load balancing, cluster fault tolerance, and RPC protocols, as shown in the diagram below:

1.png

Architecture of a simple RPC framework

In this lesson, we will mainly implement the fundamental part of the RPC framework - remote invocation. The core process of a single remote invocation in the simple RPC framework is as follows:

  1. The client first calls the local proxy, which is represented as Proxy in the diagram.
  2. On the client-side, the proxy serializes the input data into byte streams based on the protocol.
  3. The client then sends the byte data to the server-side through the network.
  4. Upon receiving the byte data, the server deserializes it based on the protocol to obtain the corresponding request information.
  5. The server-side proxy calls the corresponding business logic based on the deserialized request information.
  6. The return value of the server-side business logic is also returned to the client-side following the above logic.

Understanding this process is crucial for developing the subsequent parts of the simple RPC framework.

Project Structure #

After understanding the workflow and the implementation goals of the simple RPC framework, let’s take a look at the project structure. For convenience, we have put the entire project in one module, as shown in the following diagram. You can divide it into modules according to your needs.

image

So, what is the function of each package? We will describe them one by one.

  • protocol: Defines the custom protocol of the simple RPC framework.
  • serialization: Provides utility classes for serialization and deserialization corresponding to the custom protocol.
  • codec: Provides encoders and decoders corresponding to the custom protocol.
  • transport: Provides low-level network communication based on Netty. It uses encoders and decoders defined in the codec package, as well as serializers and deserializers from the serialization package.
  • registry: Implements a simplified version of the registration center based on ZooKeeper and Curator.
  • proxy: Implements a layer of proxy using JDK dynamic proxy.

Custom Protocol #

There are already many mature protocols available, such as HTTP and HTTPS. So why do we need to define our own RPC protocol?

From a functional perspective, HTTP only supported half-duplex transmission mode in the era of HTTP 1.X, although it supports persistent connections, it does not support server-initiated data push. In terms of efficiency, in a simple remote invocation, only the method name and a few simple parameters need to be transmitted. At this time, most of the data in the HTTP request is occupied by HTTP headers, while the actual payload is very small, resulting in low efficiency.

Of course, HTTP also has its advantages. For example, it naturally penetrates firewalls, and many frameworks and open-source software support HTTP interfaces. It is also convenient to use in conjunction with REST specifications. Therefore, many RPC frameworks directly use the HTTP protocol, especially after HTTP 2.0, such as gRPC, Spring Cloud, etc.

Here, we define a simple version of the Demo RPC protocol, as shown in the following diagram:

image

In the Demo RPC message header, it contains some control information of the entire RPC message, such as version number, magic number, message type, additional information, message ID, and the length of the message body. In the additional information (extraInfo), the message type, serialization method, compression method, and request type are defined using bit division. Of course, you can also extend the Demo RPC protocol to implement more complex functionalities.

The entity class corresponding to the Demo RPC message header is Header, defined as follows:

public class Header {

    private short magic; // Magic number

    private byte version; // Protocol version

    private byte extraInfo; // Additional information

    private Long messageId; // Message ID

    private Integer size; // Length of the message body

    ... // Getter and setter methods omitted

}

Once we have determined the structure of the Demo RPC protocol message header, let’s take a look at the fields that make up the Demo RPC protocol message body. Here, we use two entity classes, Request and Response, to represent the message bodies of request and response respectively:

public class Request implements Serializable {

    private String serviceName; // Name of the requested service class

    private String methodName; // Name of the requested method

    private Class[] argTypes; // Parameter types of the requested method

    private Object[] args; // Parameters of the requested method

    ... // Getter and setter methods omitted

}
public class Response implements Serializable {

    private int code = 0; // The error code of the response, 0 for normal response, non-zero for exception response

    private String errMsg; // Exception message

    private Object result; // Response result

    ... // Omitted getter/setter methods

}

Note that the Request and Response objects need to be serialized and implement the Serializable interface. In order for objects of these two classes to be transmitted between the Client and Server processes, serialization and deserialization operations need to be performed. Here, a Serialization interface is defined to complete the serialization-related operations:

public interface Serialization {

    <T> byte[] serialize(T obj)throws IOException;

    <T> T deSerialize(byte[] data, Class<T> clz)throws IOException;

}

In Demo RPC, Hessian serialization is used by default. The following HessianSerialization implements the Serialization interface based on Hessian serialization:

public class HessianSerialization implements Serialization {

    public <T> byte[] serialize(T obj) throws IOException {

        ByteArrayOutputStream os = new ByteArrayOutputStream();

        HessianOutput hessianOutput = new HessianOutput(os);

        hessianOutput.writeObject(obj);

        return os.toByteArray();

    }

    public <T> T deSerialize(byte[] data, Class<T> clazz) 

            throws IOException {

        ByteArrayInputStream is = new ByteArrayInputStream(data);

        HessianInput hessianInput = new HessianInput(is);

        return (T) hessianInput.readObject(clazz);

    }

}

In some scenarios, the data transmitted in the request or response is relatively large, and direct transmission consumes a lot of bandwidth. Therefore, compression is usually used before sending. In the extraInfo field of the message header introduced earlier in Demo RPC, a bit is used to indicate the compression method of the message body. Here, a Compressor interface is defined to abstract all compression algorithms:

public interface Compressor {

    byte[] compress(byte[] array) throws IOException;

    byte[] unCompress(byte[] array) throws IOException;

}

At the same time, an implementation based on the Snappy compression algorithm is provided as the default compression algorithm for Demo RPC:

public class SnappyCompressor implements Compressor {

    public byte[] compress(byte[] array) throws IOException {

        if (array == null) { return null; }

        return Snappy.compress(array);

    }

    public byte[] unCompress(byte[] array) throws IOException {

        if (array == null) { return null; }

        return Snappy.uncompress(array);

    }
}
    

### Implementation of Encoding and Decoding

Now that we understand the structure of the custom protocol, let's solve the problem of encoding and decoding the protocol.

In the previous lesson, when we introduced the core concepts of Netty, we mentioned that each Channel in Netty is bound to a ChannelPipeline and relies on the ChannelHandler added to the ChannelPipeline to handle the data received (or to be sent), including the conversion between bytes and messages. Netty provides abstract classes such as ByteToMessageDecoder, MessageToByteEncoder, MessageToMessageEncoder, and MessageToMessageDecoder to implement the conversion between Message and ByteBuf, as shown in the following figure:

![image](../images/CgqCHl9N89uAPRjFAAE8pTlyXls424.png)

Netty's Decoder and Encoder implementation

In the source code of Netty, we can see that the serialization and deserialization of many existing protocols are based on the aforementioned abstract classes. For example, in HttpServerCodec, HTTP request decoding and HTTP response encoding are implemented by depending on HttpServerRequestDecoder and HttpServerResponseEncoder, respectively. As shown in the following figure, HttpServerRequestDecoder inherits from ByteToMessageDecoder and implements the conversion between ByteBuf and HTTP requests; HttpServerResponseEncoder inherits from MessageToMessageEncoder and implements the conversion from HTTP responses to other messages (including the ability to convert to ByteBuf).

![HttpServerCodec structure diagram.png](../images/CgqCHl9OAneAfCv0AADjLyEPSpc098.png)

Decoder and Encoder implementation of HTTP protocol in Netty

In the simplified version of the RPC framework, our custom requests are not as complex as the HTTP protocol, so we only need to **simply inherit from ByteToMessageDecoder and MessageToMessageEncoder**.

First, let's look at the DemoRpcDecoder, which implements the conversion from ByteBuf to Demo RPC Message. The specific implementation is as follows:

public class DemoRpcDecoder extends ByteToMessageDecoder {

protected void decode(ChannelHandlerContext ctx,
       ByteBuf byteBuf, List<Object> out) throws Exception {

    if (byteBuf.readableBytes() < Constants.HEADER_SIZE) {
        return; // If the readable bytes are less than 16, the message header cannot be parsed. Do not read it for now.
    }

    byteBuf.markReaderIndex(); // Record the position of the current readIndex pointer for resetting

    short magic = byteBuf.readShort(); // Attempt to read the magic number part of the message header

    if (magic != Constants.MAGIC) { // If the magic number does not match, an exception will be thrown
        byteBuf.resetReaderIndex(); // Reset the readIndex pointer
        throw new RuntimeException("magic number error:" + magic);
    }

    byte version = byteBuf.readByte(); // Read the version, extra information, message ID, and message body length from the message header
    byte extraInfo = byteBuf.readByte();
    long messageId = byteBuf.readLong();
    int size = byteBuf.readInt();

    Object request = null;

    if (!Constants.isHeartBeat(extraInfo)) { // Heartbeat messages do not have a message body and do not need to be read
        if (byteBuf.readableBytes() < size) { // For non-heartbeat messages, deserialization cannot be performed without enough accumulated data
            byteBuf.resetReaderIndex();
            return;
        }

        byte[] payload = new byte[size];
        byteBuf.readBytes(payload);

        Serialization serialization = SerializationFactory.get(extraInfo); // Choose the corresponding serialization and compression methods based on the extraInfo in the message header
        Compressor compressor = CompressorFactory.get(extraInfo);

        request = serialization.deserialize(compressor.unCompress(payload), Request.class); // Decompress and deserialize to get the message body
    }

    Header header = new Header(magic, version, extraInfo, messageId, size); // Assemble the message header and message body that were read into a complete Message and pass it to the next stage
    Message message = new Message(header, request);
    out.add(message);
}

}


Next, let's look at DemoRpcEncoder, which implements the conversion from Demo RPC Message to ByteBuf. The specific implementation is as follows:

class DemoRpcEncoder extends MessageToByteEncoder{

@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
            Message message, ByteBuf byteBuf) throws Exception {

    Header header = message.getHeader();

    byteBuf.writeShort(header.getMagic()); // Serialize the magic number, version, extra information, and message ID in the message header
    byteBuf.writeByte(header.getVersion());
    byteBuf.writeByte(header.getExtraInfo());
    byteBuf.writeLong(header.getMessageId());

    Object content = message.getContent();

    if (Constants.isHeartBeat(header.getExtraInfo())) { // If it is a heartbeat message, there is no message body and write 0 here
        byteBuf.writeInt(0);
        return;
    }

    Serialization serialization = SerializationFactory.get(header.getExtraInfo()); // Serialize according to the serialization method and compression method specified by the extraInfo part
    Compressor compressor = CompressorFactory.get(header.getExtraInfo());

    byte[] payload = compressor.compress(serialization.serialize(content)); // Compress the serialized message body

    byteBuf.writeInt(payload.length); // Write the length of the message body
    byteBuf.writeBytes(payload); // Write the message body
}

}


### Summary

In this lesson, we first introduced the basic architecture of the simplified RPC framework and the basic process of handling a remote call. We also briefly introduced the structure of the entire simplified RPC framework project. Then, we explained the custom protocol format, serialization/deserialization methods, and compression methods used in the simplified RPC framework, which are essential for remote data transmission. Next, we introduced the encoding and decoding system in Netty and the implementation of the encoding and decoding codecs for the HTTP protocol. Finally, we analyzed the encoding and decoding codecs for the simplified RPC protocol, namely DemoRpcEncoder and DemoRpcDecoder.

In the next lesson, we will continue to introduce the remaining implementation of the simplified RPC framework from bottom to top.

Link to the simplified version of the RPC framework Demo: <https://github.com/xxxlxy2008/demo-prc>.