28 Actual Combat Summary, Rpc Practical Conclusions and Advanced Extensions

28 Actual combat summary, RPC practical conclusions and advanced extensions #

After the practical lessons we have covered so far, we have preliminarily completed a prototype of an RPC framework, which includes most of the core knowledge points involved in an RPC framework. We must not stop at theoretical discussions and should put what we have learned into practice. Coding is a fundamental skill for every programmer, and it is essential to get hands-on experience. Although the RPC framework prototype can already be run, it is still far from being suitable for production-level usage, such as performance and high availability. In this lesson, I will summarize the knowledge points covered and review them. I will also supplement the knowledge based on mature RPC frameworks in the industry, hoping to help you improve your system design skills.

Summary of Practical Knowledge Points #

Starting the Netty Server #

Netty provides the ServerBootstrap bootstrap class as the entry point for starting the program. ServerBootstrap assembles the core components of Netty like building blocks. The server startup process requires the following three basic steps:

  • Configure the thread pool. Netty is developed based on the Reactor model, and in most scenarios, we use the master-slave multi-thread Reactor model.
  • Channel initialization. Set the channel type and register the ChannelHandlers in the ChannelPipeline. In addition, you can set socket parameters and user-defined attributes as needed.
  • Binding to a port. Calling the bind() method will actually trigger the startup process, and the sync() method will block until the entire startup process is completed.

Custom Communication Protocol #

A complete network protocol needs to have the following basic elements: magic number, protocol version number, serialization algorithm, message type, length field, request data, and reserved fields. When implementing protocol encoding and decoding, two important abstract classes are often used: MessageToByteEncoder (encoder) and ByteToMessageDecoder (decoder). Netty also provides many out-of-the-box unpackers. The most widely used one is LengthFieldBasedFrameDecoder, which can meet most of the scenarios in real projects. If you are not familiar with the parameters of LengthFieldBasedFrameDecoder, it may be more intuitive to directly use ByteBuf. Choose according to personal preference.

ByteBuf #

ByteBuf is a core utility class that must be mastered, and the internal structure of ByteBuf should be understood. ByteBuf contains three pointers: the read pointer (readerIndex), the write pointer (writeIndex), and the maximum capacity (maxCapacity). Based on the positions of the pointers, the internal structure of ByteBuf can be divided into four parts: discarded bytes, readable bytes, writable bytes, and expandable bytes. The following figure shows this structure.

Lark20210120-170502.png

Pipeline & ChannelHandler #

ChannelPipeline and ChannelHandler are the most frequently used components in our application development. These two components provide users with full control over I/O events. ChannelPipeline is a bidirectional linked list structure that contains two types of handlers: ChannelInboundHandler and ChannelOutboundHandler. The propagation direction of inbound events is Head -> Tail, while the propagation direction of outbound events is Tail -> Head. When designing, it is necessary to clarify the order of propagation for inbound and outbound processing, as well as how data models are converted between them.

Registry Center #

The registry center is a vital component in RPC frameworks, mainly used for service registration and discovery. Currently, the popular choices for registry centers are ZooKeeper, Eureka, Etcd, Consul, Nacos, and so on. But which type of registry center, CP or AP, should be chosen? There is no definitive answer, as the selection depends on actual business scenarios. For RPC frameworks, they should have weak dependency on the registry center, so that service usage is not affected even if the registry center encounters issues. Therefore, it is recommended to use AP-type registry centers. In the scenario of service discovery, AP-type registry centers have performance advantages over CP-type ones. The whole cluster does not have the concepts of Leader and Follower. If one of the nodes goes offline, the requests will immediately be transferred to other nodes, sacrificing strong consistency to ensure high availability.

When a service node goes offline, the registry center needs to timely notify the service consumers that the node has gone offline, otherwise it may cause some service invocations to fail. The best way to achieve graceful service shutdown is to adopt a scheme of active notification + heartbeat detection. Heartbeat detection can be managed by either the node or the registry center. For example, the registry center can send a heartbeat packet to the service node every 60 seconds. If no response is received for 3 consecutive heartbeat packets, it can be considered that the service node has gone offline. Heartbeat detection is usually a mechanism for notifying each other of the survival status between the client and the server. I will show you the basic implementation of heartbeat detection in the following text.

Dynamic Proxy and Reflection Invocation #

If we want to make the underlying details of RPC transparent to service consumers, we cannot avoid dynamic proxies. Dynamic proxies provide a mechanism to dynamically build proxy classes and dynamically invoke target methods at runtime. We must create an interface proxy object and implement encoding, request invocation, decoding, and other operations in the proxy object.

Common implementations of dynamic proxies are JDK dynamic proxy and Cglib dynamic proxy. The choice of dynamic proxy technology needs to be targeted according to specific scenarios, and performance testing should be done. The objects that JDK dynamic proxy represents must implement one or more interfaces, and the generated proxy class is also an implementation class of the interface. JDK dynamic proxy invokes the methods in the proxy class through reflection and cannot proxy methods that do not exist in the interface. Cglib dynamic proxy is more flexible compared to JDK dynamic proxy. Cglib generates a subclass through byte code technology for the specified class and overrides its methods, so the type of the proxy class is not restricted.

When the service provider receives an RPC request, it needs to execute the real method call through reflection. In order to accelerate the performance of service interface invocation, we can use the FastClass mechanism provided by Cglib to directly invoke methods, which has higher performance than reflection. The FastClass mechanism does not call the methods being proxied through reflection, but dynamically generates a new subclass of FastClass at runtime and writes the logic of directly invoking the target method into the subclass. At the same time, this subclass assigns an int type index to the proxy class, and FastClass can locate the method to be called through the index. Generating the FastClass subclass is time-consuming, and caching FastClass can further optimize the performance of the RPC framework.

Performance Optimization #

There are many factors that affect the performance of an RPC framework. We usually focus on several aspects: I/O model, network parameters, serialization methods, memory management, etc. Next, I will introduce commonly used optimization methods in RPC frameworks in a knowledge-point format.

I/O Model #

Netty provides an efficient master-slave reactor multi-thread model, where the master reactor thread is responsible for creating new network connections and registering channels to the slave reactor. The slave reactor threads are responsible for handling subsequent I/O operations. The master-slave reactor multi-thread model effectively solves the performance bottleneck of single NIO thread in handling massive client connection and I/O operations under high concurrency.

We usually configure the master-slave reactor thread model as follows:

EventLoopGroup bossGroup = new NioEventLoopGroup();
		
EventLoopGroup workerGroup = new NioEventLoopGroup();
		
ServerBootstrap b = new ServerBootstrap();
		
b.group(bossGroup, workerGroup)

If you do not specify the number of workerGroup threads to initialize, Netty will default to creating a number of threads that is twice the number of CPU cores. However, this may not be the optimal number, and adjustments can be made based on actual load testing. Generally speaking, the fewer workerGroup threads initialized, the better, as this can effectively reduce thread context switching.

Netty provides a parameter called ioRatio, which can adjust the time ratio between I/O event processing and task processing. The default value is 50. For high-concurrency RPC call scenarios, ioRatio can be increased appropriately to allocate more time to executing I/O tasks.

Netty network parameter configuration #

Netty provides ChannelOptions for us to optimize TCP parameter configuration. In order to improve network communication throughput, it is necessary to understand some optional network parameters.

  • TCP_NODELAY: Whether to enable the Nagle algorithm. The Nagle algorithm accumulates network packets in a cache before sending them to avoid frequent sending of small packets. The Nagle algorithm is very effective in scenarios with massive traffic, but it can cause some data latency. If data transmission latency is important, this parameter should be disabled.
  • SO_BACKLOG: The maximum length of the completed three-way handshake request queue. The server may handle multiple connections at the same time, so the parameter should be increased appropriately in high-concurrency scenarios with massive connections. However, SO_BACKLOG should not be set too large to prevent SYN-Flood attacks.
  • SO_SNDBUF/SO_RCVBUF: The size of the TCP send buffer and receive buffer. In order to achieve the maximum network throughput, SO_SNDBUF should not be smaller than the product of bandwidth and latency. SO_RCVBUF always saves data until it is read by the application process. If SO_RCVBUF is full, the receiving end will notify the TCP protocol in the remote end to close the window, ensuring that SO_RCVBUF does not overflow.
  • SO_KEEPALIVE: Connection keep-alive. When the TCP SO_KEEPALIVE attribute is enabled, TCP will actively probe the connection status. Linux sets the default heartbeat frequency to 2 hours. The TCP KEEPALIVE mechanism is mainly used to recycle connections with a long death time and is not suitable for highly real-time scenarios.

Serialization methods #

During network communication, serialization and deserialization operations are inevitable, which involve encoding objects into bytes and decoding bytes into objects. Serialization and deserialization are frequent and heavy operations and are important points of performance optimization in RPC frameworks. When choosing a serialization method, various factors need to be considered, such as high performance, cross-language compatibility, maintainability, and scalability.

Commonly used serialization algorithms include Kryo, Hessian, and Protobuf. These third-party serialization algorithms are more efficient than Java’s native serialization. Kryo serialization occupies less bytes after serialization, resulting in a higher network transmission efficiency, but it does not support cross-language compatibility. Hessian is currently widely used serialization protocol in the industry. It has good compatibility, supports cross-language compatibility, and provides convenient APIs, with a moderate number of bytes after serialization. Protobuf is the default serialization protocol used by the gRPC framework, which is a serialization framework developed by Google. Protobuf supports cross-language and cross-platform compatibility, has good scalability, and provides better performance than Hessian. However, when using Protobuf, developers need to write specific proto files and compile them into programs in different languages, which adds complexity to developers. Taking into account various factors as well as reputation, I personally recommend using Hessian and Protobuf serialization protocols.

For further performance optimization of RPC framework serialization, the following methods can be used:

  • Reduce unnecessary fields and shorten the length of fields to reduce the number of bytes occupied after serialization.
  • Provide different serialization strategies. Different fields can be split into different threads for deserialization. For example, Netty I/O threads can only be responsible for deserializing the className and message header, and then distribute them to different business thread pools based on the header, with business threads responsible for deserializing the message body content. This can effectively reduce the pressure on I/O threads.

Memory management #

Netty uses off-heap DirectBuffer for socket read and write, reducing one memory copy compared to using on-heap memory. However, the creation and destruction costs of off-heap memory are higher, so memory pools are usually used to improve performance. You can review the Netty pooling technology introduced in the “Lightweight Object Recycling Station: Recycler Object Pooling Technology Analysis” course. For scenarios with small amounts of data, using HeapBuffer may be more efficient as JVM is responsible for memory allocation and recovery.

In addition, Netty provides some techniques to avoid memory copy:

  • CompositeByteBuf is a very important data structure in Netty for implementing zero-copy mechanism. It can combine multiple Buffer objects into a logical object, avoiding merging several Buffers into a large Buffer through traditional memory copy methods. We often use CompositeByteBuf to concatenate the header information (Header) and message body data (Body) of a protocol.
  • In failure retry scenarios, if you want to keep using the ByteBuf, you can use the copy() method to copy all the information of the original ByteBuf. However, deep copying is very inefficient, so you can use shallow copying operations like oldBuffer.duplicate().retain() to copy independent read and write indexes. The allocated memory and reference counts are shared with the original ByteBuf, and retain() will increase the reference count of the ByteBuffer, avoiding its release.

High availability #

In the entire RPC framework practice course, we did not focus much on high availability of the RPC framework. However, high availability is an important factor in distributed system architecture design. Below, let’s discuss together how to improve the availability of the RPC framework.

Connection Idle Detection + Heartbeat Detection #

Connection idle detection refers to periodically checking whether the connection has any data read or write. If the server keeps receiving data sent by the client connection, it indicates that the connection is active. For a dead connection, it won’t receive any data from the other end. However, if no data is received from the client within a certain period of time, it does not necessarily mean that the connection is dead. It is possible that the client simply doesn’t have any data to send for a long time, but the established connection is still in a healthy state. Therefore, the server also needs to determine if the client is alive through heartbeat detection. The client can send a heartbeat packet to the server at regular intervals. If no heartbeat data is received for N times, it can be concluded that the current client has gone offline or is in an unhealthy state. Therefore, connection idle detection and heartbeat detection are effective measures to deal with dead connections. The interval for idle detection should be larger than the interval for heartbeat detection by at least 2 cycles, mainly to avoid cases where the heartbeat packet fails to be received due to network jitter.

Netty provides an out-of-the-box implementation of IdleStateHandler for connection idle detection. If we want to close client connections that haven’t read any data within a certain time interval, we can implement it as shown below:

public class RpcIdleStateHandler extends IdleStateHandler {

    public RpcIdleStateHandler() {
        super(60, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        ctx.channel().close();
    }
}

The IdleStateHandler implements heartbeat detection by essentially adding scheduled tasks to the task queue, checking if the channelRead() or write() method has reached the idle timeout. The constructor of IdleStateHandler supports setting the idle time for read, write, and read-write. super(60, 0, 0, TimeUnit.SECONDS) indicates that we only care about the idle time for read. If the server hasn’t read any data for 60 seconds, the channelIdle() method will be called. At this point, we can close the connection to avoid resource wastage.

There is no ready-made implementation for heartbeat detection in Netty, but its implementation principle is similar to idle detection. The client can use the schedule() method provided by EventLoop to add a scheduled task for sending heartbeat data to the task queue, as shown below:

public class RpcHeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        doHeartBeatTask(ctx);
    }

    private void doHeartBeatTask(ChannelHandlerContext ctx) {
        ctx.executor().schedule(() -> {
            if (ctx.channel().isActive()) {
                HeartBeatData heartBeatData = buildHeartBeatData();
                ctx.writeAndFlush(heartBeatData);
            }
        }, 0, 60, TimeUnit.SECONDS); // Send heartbeat data every 60 seconds
    }
}
public class HeartbeatTask {

    public void start() {
        // Create a scheduled executor service with a single thread
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        // Schedule the heartbeat task to run every 10 seconds
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // Perform the heartbeat task
                doHeartBeatTask(ctx);
            }
        }, 10, TimeUnit.SECONDS);
    }
}

Client sends a heartbeat package to the server at regular intervals. The server does not respond upon receiving it, as responding to heartbeat data would consume certain resources, especially when the number of client connections established with the server is large. To achieve mutual awareness of client and server's states of being alive, a bidirectional heartbeat mechanism needs to be adopted. The most appropriate heartbeat detection method should be chosen based on the actual scenario.

### Thread Pool Isolation

If your RPC service is a fundamental service of the company, there may be numerous callers such as user interfaces and order interfaces, etc. In our implemented RPC framework, the business thread pool is shared, and all RPC requests will be processed by this thread pool. If, one day, the traffic from one of the service callers increases dramatically, causing the depletion of thread pool resources, other service callers will be severely affected as well. We can attempt to divide different service callers into business thread pools of different levels, isolate the traffic of service callers in a grouping manner to avoid the occurrence of abnormal states in one caller that render all other callers unavailable, and improve the overall performance and availability of the service.

Traffic isolation technique is an important measure in service governance, which has been applied in many business systems with large-scale traffic, such as the spike system. For instance, by recognizing whether a request is a spike request or a regular request based on special request headers, the traffic can be isolated. As for the RPC framework, how can we group service callers reasonably? Generally speaking, using the importance level of the applications as the grouping basis is a good criterion. It is essential to guarantee that the core business is not affected. For example, the interfaces related to placing orders and making payments should have their own independent business thread pools to avoid being affected by other service callers.

#### Retry Mechanism

You must be very familiar with the retry mechanism, as you often use it in your project development. The retry mechanism is generally helpful in ensuring service stability and fault tolerance, as it can help solve many problems, such as network jitter and request timeouts.

There are several best practices and considerations regarding the retry mechanism in RPC frameworks, which I think it is necessary to share with you:

- The business logic of the invoked service interface needs to ensure idempotence in order to consider using the retry mechanism. For example, for data insertion and updating operations, no matter how many duplicate requests are made, there should be no effects.
  * Although the retry mechanism can improve service availability, retries can increase traffic for service providers and may even cause avalanches in extreme cases. Service callers should set reasonable service invocation timeout and retry count, taking into account factors such as the average response time of interface-dependent services, TP99 response time, and service importance level as reference. In order to prevent a traffic storm caused by retries, service providers must consider protection measures such as circuit breaker, flow control, and degradation.
  * The retry mechanism of RPC frameworks generally adopt exponential back-off strategy, with exponentially increasing intervals between retries, for example, 1s, 2s, 4s, 8s, and so on, while limiting the maximum delay. Exponential back-off may have a peak load issue, for example, the service provider may encounter FullGC, resulting in an increase in timeout retry requests at the same time. To solve the peak load issue, random values can be added to the retry interval to distribute the requests at different time points.
  * When selecting service nodes for load balancing, the nodes that have failed during the last retry should be excluded in order to further increase the success rate of retries.

#### Cluster Fault Tolerance

Cluster fault tolerance refers to the handling strategy when exceptions occur during the service consumer's call to the service provider cluster. Taking the Dubbo framework as an example, it provides six built-in cluster fault tolerance measures.

  * **Failover** is the default cluster fault tolerance measure in Dubbo. When a call fails, it will retry with other service nodes. Failover strategy is suitable for idempotent operations, but as mentioned earlier, the side effects of retries can lead to a large number of retry requests if there are problems with the service provider.
  * **Failfast** is suitable for non-idempotent operations. The service consumer makes a single call, and if there is a failure, it throws an error immediately without any retries. The disadvantage of Failfast is that the service consumer needs to control the retry logic by itself.
  * **Failsafe** strategy directly ignores exceptions. Failsafe strategy is suitable for executing non-core operations, such as monitoring and logging.
  * **Failback** strategy records failed requests in a queue and periodically re-sends them. Failback strategy is suitable for scenarios with low real-time requirements, such as message pushing.
  * **Forking** strategy calls multiple service provider nodes in parallel, and as long as one call succeeds, it returns the result. Forking strategy is usually used for operations with high real-time requirements and can reduce the TP999 metric, but it requires sacrificing certain server resources.
  * **Broadcast** strategy broadcasts to all service providers one by one, and waits until all of them complete. If any of them fails, an exception is thrown. Broadcast strategy is usually used when updating the local resource status of service providers.

These cluster fault tolerance measures can be configured and selected according to the actual business scenario, and Dubbo provides Cluster extension interface for customizing cluster fault tolerance modes.

In addition, there are many other measures to achieve high availability of RPC frameworks, such as flow control, dynamic scaling, smooth restart, and service governance. Due to limitations in length, I will not go into detail here. It is not difficult to implement a prototype of an RPC framework, but ensuring the high performance, high availability, and easy scalability of an RPC framework requires continuous learning and accumulation of skills.

### Conclusion

To master a technology, it is essential to study the source code and gain long-term practical experience. For the convenience of learning, this column has implemented the basic functionality of an RPC framework in its entirety. What's even more interesting is that there are more advanced features of RPC frameworks waiting for us to explore, such as service governance, thread pool isolation, cluster fault tolerance, and circuit breaker and flow control. Are you impatient to further explore more knowledge about RPC frameworks? Let's work together to refine the practical project and continuously improve our architectural design and coding skills!