25 Remote Communication, Designing Communication Protocols and Implementing Encoding and Decoding

25 Remote communication, designing communication protocols and implementing encoding and decoding #

In the previous class, we built the basic framework for service providers and service consumers, and now we can establish a communication mechanism between the two modules. In this lesson, we will complete the remote communication mechanism of the RPC framework by adding custom business processors to the ChannelPipeline. The main functionalities that need to be implemented are as follows:

  • The service consumer implements protocol encoding to send invocation data to the service provider.
  • The service provider decodes the received data and then sends the response data to the service consumer, temporarily ignoring how the RPC request is invoked.
  • The service consumer receives the response data and returns successfully.

Source code reference: mini-rpc

RPC Communication Scheme Design #

Considering the objectives of this lesson, we will now break down and analyze the processes of RPC request invocation and result response. Let’s first look at the process of RPC request invocation, as shown in the following diagram.

Drawing 0.png

For the service consumer, the RPC request process is an outbound operation, while for the service provider, it is an inbound operation. Before sending the data, the service consumer encapsulates the RPC request information into a MiniRpcProtocol object and then performs binary encoding using the MiniRpcEncoder. Finally, the encoded data is directly sent to the remote end. After receiving the request data, the service provider hands over the binary data to the decoder MiniRpcDecoder, which decodes it into a MiniRpcProtocol object again, and then passes it to RpcRequestHandler to perform the actual RPC request invocation.

We temporarily ignore how RpcRequestHandler executes the RPC request invocation. Now let’s continue to analyze how RpcRequestHandler returns the response result to the service consumer after processing it successfully, as shown in the following diagram:

Drawing 1.png

Contrary to the RPC request process, the service provider encapsulates the response result into a MiniRpcProtocol object and then encodes it using the MiniRpcEncoder before sending it to the service consumer. The service consumer decodes the response result. Since RPC requests are highly concurrent, the RpcRequestHandler needs to find the corresponding request based on the response result. Finally, the response result is returned to the service consumer.

From the comprehensive analysis of the RPC request invocation and result response processes, it can be seen that the encoder MiniRpcEncoder, decoder MiniRpcDecoder, and communication protocol object MiniRpcProtocol can all be designed to be reusable. The ChannelPipeline structure for the service consumer and service provider is shown in the following diagram.

Drawing 2.png

Therefore, when implementing the Netty network communication module, it is very helpful to draw a diagram and analyze the processing flow of ChannelHandlers first.

Custom RPC Communication Protocol #

The protocol is the foundation of communication between service consumers and service providers. Mainstream RPC frameworks often customize their own communication protocols, which can achieve better performance, scalability, and security compared to general protocols such as HTTP, HTTPS, and JSON. In the course “Secret Handshake: How to Implement Custom Protocol Communication Using Netty,” we learned about the factors to consider when designing a complete communication protocol. Combining it with the scenarios of RPC request invocation and result response, we have designed a simplified version of the RPC custom protocol, as shown below:

+---------------------------------------------------------------+

| Magic Number (2 bytes) | Protocol Version (1 byte) | Serialization Algorithm (1 byte) | Message Type (1 byte) |

+---------------------------------------------------------------+

| Status (1 byte) | Message ID (8 bytes) | Data Length (4 bytes) |

+---------------------------------------------------------------+

| Data Content (variable length) |

+---------------------------------------------------------------+

We divide the protocol into two parts: the protocol header (Header) and the protocol body (Body). The protocol header contains the magic number, protocol version, serialization algorithm, message type, status, message ID, and data length, while the protocol body only contains the data content, the length of which is variable. This protocol can be used for communication between RPC requests and responses. The definitions of the corresponding protocol entity classes are as follows:

@Data

public class MiniRpcProtocol<T> implements Serializable {

    private MsgHeader header; // Protocol header

    private T body; // Protocol body

}

@Data

public class MsgHeader implements Serializable {

    private short magic; // Magic number

    private byte version; // Protocol version

    private byte serialization; // Serialization algorithm

    private byte msgType; // Message type

    private byte status; // Status

    private long requestId; // Message ID

    private int msgLen; // Length of data

}

In the scenario of RPC request invocation, the generic type T in MiniRpcProtocol corresponds to the MiniRpcRequest type, which mainly includes the necessary parameters for RPC remote invocation, defined as follows:

@Data

public class MiniRpcRequest implements Serializable {

    private String serviceVersion; // Service version

    private String className; // Service interface name

    private String methodName; // Service method name

    private Object[] params; // Method parameter list

    private Class<?>[] parameterTypes; // Method parameter type list

}

In the scenario of RPC result response, the generic type T in MiniRpcProtocol corresponds to the MiniRpcResponse type. The definition of the MiniRpcResponse entity class is as follows. In addition, whether the response result is successful can be represented by the status field in MsgHeader. 0 represents success and a non-zero value represents failure. The data field in MiniRpcResponse represents the RPC request result returned in a successful state, and the message field represents the error message when the RPC request invocation fails.

@Data

public class MiniRpcResponse implements Serializable {

    private Object data; // Request result

    private String message; // Error message

}

After designing the RPC custom protocol, let’s solve the problem of how to encode MiniRpcRequest and MiniRpcResponse.

Serialization Selection #

The MiniRpcRequest and MiniRpcResponse entity classes represent the content of the protocol body, the length of which is uncertain. Therefore, we generally choose a commonly used and efficient serialization algorithm to convert them into binary data. This can effectively reduce the bandwidth of network transmission and improve the overall performance of the RPC framework. Currently, commonly used serialization algorithms include Json, Kryo, Hessian, Protobuf, etc. These third-party serialization algorithms are more efficient than the native Java serialization operations.

First, we define a common serialization interface RpcSerialization, which must be implemented by all serialization algorithm extensions. The RpcSerialization interface provides serialization and deserialization methods which are serialize() and deserialize() respectively, as shown below:

public interface RpcSerialization {

    <T> byte[] serialize(T obj) throws IOException;

    <T> T deserialize(byte[] data, Class<T> clz) throws IOException;

}

Next, we provide two implementation classes, HessianSerialization and JsonSerialization, for RpcSerialization. Here is the implementation logic of HessianSerialization:

@Component
@Slf4j
public class HessianSerialization implements RpcSerialization {

    @Override
    public <T> byte[] serialize(T object) {

        if (object == null) {
            throw new NullPointerException();
        }

        byte[] results;
        HessianSerializerOutput hessianOutput;

        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            hessianOutput = new HessianSerializerOutput(os);
            hessianOutput.writeObject(object);
            hessianOutput.flush();
            results = os.toByteArray();
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return results;
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clz) {

        if (bytes == null) {
            throw new NullPointerException();
        }

        T result;

        try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
            HessianSerializerInput hessianInput = new HessianSerializerInput(is);
            result = (T) hessianInput.readObject(clz);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return result;
    }

}

To support different serialization algorithms, we use the factory pattern to switch between different serialization algorithms using the same serialization interface. Users only need to know the type of serialization algorithm, and they don’t need to care about how the underlying serialization is implemented. Here is the specific implementation:

public class SerializationFactory {

    public static RpcSerialization getRpcSerialization(byte serializationType) {

        SerializationTypeEnum typeEnum = SerializationTypeEnum.findByType(serializationType);

        switch (typeEnum) {
            case HESSIAN:
                return new HessianSerialization();
            case JSON:
                return new JsonSerialization();
            default:
                throw new IllegalArgumentException("serialization type is illegal, " + serializationType);
        }

    }

}

With the above knowledge in place, we can now start implementing custom handlers.

Protocol Encoding Implementation #

In the course “Secret Language: How to Implement Custom Protocol Communication Using Netty”, we also introduced how to use Netty to implement custom communication protocols. Netty provides two commonly used abstract base classes for encoding and decoding: MessageToByteEncoder and ByteToMessageDecoder, which help us easily extend and implement custom protocols.

The encoder we are going to implement, MiniRpcEncoder, needs to inherit MessageToByteEncoder and override the encode() method. Here is the specific implementation:

public class MiniRpcEncoder extends MessageToByteEncoder<MiniRpcProtocol<Object>> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MiniRpcProtocol<Object> msg, ByteBuf byteBuf) throws Exception {

        MsgHeader header = msg.getHeader();

        byteBuf.writeShort(header.getMagic());

        byteBuf.writeByte(header.getVersion());

        byteBuf.writeByte(header.getSerialization());

        byteBuf.writeByte(header.getMsgType());

        byteBuf.writeByte(header.getStatus());

        byteBuf.writeLong(header.getRequestId());

Note: The translation may not be perfect and may require further review.

RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(header.getSerialization());

byte[] data = rpcSerialization.serialize(msg.getBody());

byteBuf.writeInt(data.length);

byteBuf.writeBytes(data);

}

}

The encoding logic is relatively simple. Before the service consumer or service provider calls writeAndFlush() to send data to the other party, it has already been encapsulated as MiniRpcRequest or MiniRpcResponse, so MiniRpcProtocol <Object> can be used as the encoding type supported by MiniRpcEncoder.

Protocol decoding implementation #

The decoder MiniRpcDecoder needs to inherit ByteToMessageDecoder and rewrite the decode() method. The specific implementation is as follows:

public class MiniRpcDecoder extends ByteToMessageDecoder {

    @Override

    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        if (in.readableBytes() < ProtocolConstants.HEADER_TOTAL_LEN) {

            return;

        }

        in.markReaderIndex();

        short magic = in.readShort();

        if (magic != ProtocolConstants.MAGIC) {

            throw new IllegalArgumentException("magic number is illegal, " + magic);

        }

        byte version = in.readByte();

        byte serializeType = in.readByte();

        byte msgType = in.readByte();

        byte status = in.readByte();

        long requestId = in.readLong();

        int dataLength = in.readInt();

        if (in.readableBytes() < dataLength) {

            in.resetReaderIndex();

            return;

        }

        byte[] data = new byte[dataLength];

        in.readBytes(data);

        MsgType msgTypeEnum = MsgType.findByType(msgType);

        if (msgTypeEnum == null) {

            return;

        }

        MsgHeader header = new MsgHeader();

        header.setMagic(magic);

        header.setVersion(version);

        header.setSerialization(serializeType);

        header.setStatus(status);

        header.setRequestId(requestId);

        header.setMsgType(msgType);

        header.setMsgLen(dataLength);

        RpcSerialization rpcSerialization = SerializationFactory.getRpcSerialization(serializeType);

        switch (msgTypeEnum) {

            case REQUEST:

                MiniRpcRequest request = rpcSerialization.deserialize(data, MiniRpcRequest.class);

                if (request != null) {

                    MiniRpcProtocol<MiniRpcRequest> protocol = new MiniRpcProtocol<>();

                    protocol.setHeader(header);

                    protocol.setBody(request);

                    out.add(protocol);

                }

            case RESPONSE:

                MiniRpcResponse response = rpcSerialization.deserialize(data, MiniRpcResponse.class);

                if (response != null) {

                    MiniRpcProtocol<MiniRpcResponse> protocol = new MiniRpcProtocol<>();

                    protocol.setHeader(header);

                    protocol.setBody(response);

                    out.add(protocol);

                }

            case HEARTBEAT:

                // TODO

                break;

        }

    }

}

MiniRpcDecoder #

Compared to the encoder MiniRpcEncoder, the decoder MiniRpcDecoder is much more complex. The goal of MiniRpcDecoder is to decode the byte stream data into message objects and pass them to the next inbound handler. There are several key points to note in the entire decoding process of MiniRpcDecoder:

  • The data is only read when the content in the ByteBuf is larger than the fixed 18 bytes of the protocol header Header.
  • Even if the protocol header Header can be read completely, the protocol body Body may not be ready yet. So when reading the data just start, you need to use the markReaderIndex() method to mark the read pointer position. When the number of readable bytes in the ByteBuf is less than the length of the protocol body Body, use the resetReaderIndex() method to restore the read pointer position, indicating that the readable bytes in the ByteBuf are not enough for a complete data packet.
  • Different protocol body objects need to be deserialized according to different message types MsgType. In the scenario of RPC request invocation, the service provider needs to deserialize the protocol body content into a MiniRpcRequest object; in the scenario of RPC result response, the service consumer needs to deserialize the protocol body content into a MiniRpcResponse object.

Request Processing and Response #

In the scenario of RPC request invocation, after the MiniRpcDecoder encoder of the service provider decodes the binary data into a MiniRpcProtocol object, it is then passed to the RpcRequestHandler to execute the RPC request invocation. RpcRequestHandler is also an inbound handler, it does not need to do any decoding work, so RpcRequestHandler can directly inherit SimpleChannelInboundHandler, and then override the channelRead0() method, the specific implementation is as follows:

@Slf4j
public class RpcRequestHandler extends SimpleChannelInboundHandler<MiniRpcProtocol<MiniRpcRequest>> {

    private final Map<String, Object> rpcServiceMap;

    public RpcRequestHandler(Map<String, Object> rpcServiceMap) {
        this.rpcServiceMap = rpcServiceMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocol<MiniRpcRequest> protocol) {
        RpcRequestProcessor.submitRequest(() -> {
            MiniRpcProtocol<MiniRpcResponse> resProtocol = new MiniRpcProtocol<>();
            MiniRpcResponse response = new MiniRpcResponse();
            MsgHeader header = protocol.getHeader();
            header.setMsgType((byte) MsgType.RESPONSE.getType());
            try {
                Object result = handle(protocol.getBody()); // TODO RPC service invocation
                response.setData(result);
                header.setStatus((byte) MsgStatus.SUCCESS.getCode());
                resProtocol.setHeader(header);
                resProtocol.setBody(response);
            } catch (Throwable throwable) {
                header.setStatus((byte) MsgStatus.FAIL.getCode());
                response.setMessage(throwable.toString());
                log.error("process request {} error", header.getRequestId(), throwable);
            }
            ctx.writeAndFlush(resProtocol);
        });
    }
}

Because RPC request invocation is time consuming, it is recommended to submit the RPC request to a custom business thread pool for execution. The handle() method is the place where the RPC invocation is actually made. You can leave it as an empty implementation for now, and we will complete it in the dynamic proxy course. Based on the execution result of handle(), MiniRpcProtocol will be set with the successful or failed status, as well as the corresponding request result or error message. Finally, the data is written back to the service consumer through the writeAndFlush() method.

In the previous section, we have analyzed the inbound operation of the service consumer. Firstly, it needs to go through the MiniRpcDecoder decoder to decode the MiniRpcProtocol response result based on the message type msgType, and then pass it to the RpcResponseHandler handler. RpcResponseHandler is responsible for responding to the request results from different threads, the specific implementation is as follows:

public class RpcResponseHandler extends SimpleChannelInboundHandler<MiniRpcProtocol<MiniRpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocol<MiniRpcResponse> msg) {
        long requestId = msg.getHeader().getRequestId();
        MiniRpcFuture<MiniRpcResponse> future = MiniRpcRequestHolder.REQUEST_MAP.remove(requestId);
        future.getPromise().setSuccess(msg.getBody());
    }
}

public class MiniRpcRequestHolder {
    public final static AtomicLong REQUEST_ID_GEN = new AtomicLong(0);
    public static final Map<Long, MiniRpcFuture<MiniRpcResponse>> REQUEST_MAP = new ConcurrentHashMap<>();
}

@Data
public class MiniRpcFuture<T> {
    private Promise<T> promise;
    private long timeout;
    public MiniRpcFuture(Promise<T> promise, long timeout) {
        this.promise = promise;
        this.timeout = timeout;
    }
}

When the service consumer initiates a call, it maintains a mapping relationship between the request requestId and MiniRpcFuture. RpcResponseHandler will find the corresponding MiniRpcFuture that initiated the call based on the requestId of the request, and then set the response result for the MiniRpcFuture.

We use the Promise tool provided by Netty to implement synchronous waiting for RPC requests. Promise extends more new features based on JDK’s Future, helping us to better perform asynchronous programming in a synchronous manner. The Promise pattern is essentially an asynchronous programming model. We can first obtain a credential to view the task execution result without waiting for the task to complete. When we need to get the task execution result, we can use the relevant interfaces provided by the credential to obtain it.

So far, we have completed the communication module of the RPC framework. Custom protocols, encoders/decoders, and serialization/deserialization are essential basic knowledge for implementing remote communication. In addition, in the course “Architecture Design: How to Implement a High-performance Distributed RPC Framework”, we introduced multiple ways to make RPC calls. Let’s start thinking about how to implement them.

Summary #

In this lesson, by designing and implementing a custom protocol for RPC, we have deepened our understanding of Netty’s custom handler ChannelHandler. ChannelPipeline and ChannelHandler are the most frequently used components in project development. When designing, we must clarify the passing order of inbound and outbound handlers and how the data models are converted.

Here are two tasks for you:

  1. The Protobuf serialization algorithm is also a necessary skill for us. It is not implemented in this course, and you need to expand it according to the interface specifications.
  2. If you want to compress the content of the protocol body, how should the RPC custom protocol be improved? How should the encoder/decoder be implemented?