12 Simple Rpc Framework Implementation Below

12 Simple RPC Framework Implementation Below #

In the previous lesson, we introduced the structure and working principle of the entire simple RPC framework project, as well as the specific implementation of the underlying protocol structure, serialization/deserialization, compression, and codec. In this lesson, we will continue to introduce the remaining parts of the simple RPC framework implementation from bottom to top.

Implementation of transport module #

As mentioned earlier when introducing the Netty thread model, we cannot perform time-consuming business logic in Netty’s I/O thread. In the Demo RPC framework, when the server side receives a request, it will first deserialize the request message using the DemoRpcDecoder we introduced earlier. Then, we will submit the request to a business thread pool for processing using a custom ChannelHandler (DemoRpcServerHandler).

When the client side of the Demo RPC framework receives a response message, it also deserializes the response message using DemoRpcDecoder, and then returns the response to the upper-level business using a custom ChannelHandler (DemoRpcClientHandler).

Both DemoRpcServerHandler and DemoRpcClientHandler inherit from SimpleChannelInboundHandler, as shown in the diagram below:

Drawing 0.png

Inheritance diagram of DemoRpcClientHandler and DemoRpcServerHandler

Now let’s take a look at the implementation of these two custom ChannelHandlers:

public class DemoRpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {

    // Business thread pool
    static Executor executor = Executors.newCachedThreadPool();
    protected void channelRead0(final ChannelHandlerContext ctx, Message<Request> message) throws Exception {
        byte extraInfo = message.getHeader().getExtraInfo();
        if (Constants.isHeartBeat(extraInfo)) { // Heartbeat message, return directly
            channelHandlerContext.writeAndFlush(message);
            return;
        }
        // Non-heartbeat message, encapsulate it as a Runnable and submit it to the business thread
        executor.execute(new InvokeRunnable(message, cxt));
    }
}

public class DemoRpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {
    protected void channelRead0(ChannelHandlerContext ctx, Message<Response> message) throws Exception {
        NettyResponseFuture responseFuture = Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());
        Response response = message.getContent();
        // Special handling for heartbeat messages
        if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {
            response = new Response();
            response.setCode(Constants.HEARTBEAT_CODE);
        }
        responseFuture.getPromise().setSuccess(response);
    }
}

Note that there are two points that need to be explained. The first point is the InvokeRunnable on the server side. In this Runnable task, we will call the corresponding method based on the serviceName, methodName, and parameter information of the request:

class InvokeRunnable implements Runnable {
    private ChannelHandlerContext ctx;
    private Message<Request> message;

    public void run() {
        Response response = new Response();
        Object result = null;
        try {
            Request request = message.getContent();
            String serviceName = request.getServiceName();
            // Here, BeanManager is used to manage all the business beans. Underlying, it maintains a collection of business bean instances in memory. Interested students can try integrating Spring and other container to manage business beans.
            Object bean = BeanManager.getBean(serviceName);
            // The following code uses reflection to invoke the corresponding method in the bean
            Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());
            result = method.invoke(bean, request.getArgs());
        } catch (Exception e) {
            // Omitted exception handling
        } finally {
        }
        response.setResult(result); // Set the response result
        // Return the response message to the client
        ctx.writeAndFlush(new Message(message.getHeader(), response));
    }
}

The second point is the Connection on the client side. It is used to temporarily store the requests that have been sent out but have not received a response. Therefore, when a response is returned, we can find the corresponding request and future, and return the response result to the upper-level business logic. The specific implementation is as follows:

public class Connection implements Closeable {
    private static AtomicLong ID_GENERATOR = new AtomicLong(0);
    public static Map<Long, NettyResponseFuture<Response>> IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();
    private ChannelFuture future;
    private AtomicBoolean isConnected = new AtomicBoolean();
    public Connection(ChannelFuture future, boolean isConnected) {
        this.future = future;
        this.isConnected.set(isConnected);
    }
    public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {
        // Generate and set the message ID
        long messageId = ID_GENERATOR.incrementAndGet();
        message.getHeader().setMessageId(messageId);
        // Create a future associated with the message
        NettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(), timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));
        // Record the message ID and the associated Future in the IN_FLIGHT_REQUEST_MAP collection
        IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);
        try {
            future.channel().writeAndFlush(message); // Send the request
        } catch (Exception e) {
            // When an exception occurs while sending the request, remove the corresponding Future
            IN_FLIGHT_REQUEST_MAP.remove(messageId);
            throw e;
        }
        return responseFuture;
}

// Omitted getter/setter and close() methods

}

We can see that there is no operation in the Connection class to clean the IN_FLIGHT_REQUEST_MAP collection. When it fails to get a response, the IN_FLIGHT_REQUEST_MAP will keep growing and eventually cause an OOM. You can add a timer to clean up expired request messages periodically, but we won’t go into detail here.

After writing the custom ChannelHandler, we need to define two more classes: DemoRpcClient and DemoRpcServer, which serve as the entry points for the Client and Server, respectively. The implementation of DemoRpcClient is as follows:

public class DemoRpcClient implements Closeable {

    protected Bootstrap clientBootstrap;

    protected EventLoopGroup group;

    private String host;

    private int port;

    public DemoRpcClient(String host, int port) throws Exception {

        this.host = host;

        this.port = port;

        clientBootstrap = new Bootstrap();

        // Create and configure the clientBootstrap

        group = NettyEventLoopFactory.eventLoopGroup(

            Constants.DEFAULT_IO_THREADS, "NettyClientWorker");

        clientBootstrap.group(group)

                .option(ChannelOption.TCP_NODELAY, true)

                .option(ChannelOption.SO_KEEPALIVE, true)

                .channel(NioSocketChannel.class)

                // Specify the order of the ChannelHandler

                .handler(new ChannelInitializer<SocketChannel>() {

                    protected void initChannel(SocketChannel ch) {

                        ch.pipeline().addLast("demo-rpc-encoder", 

                            new DemoRpcEncoder());

                        ch.pipeline().addLast("demo-rpc-decoder", 

                            new DemoRpcDecoder());

                        ch.pipeline().addLast("client-handler", 

                            new DemoRpcClientHandler());

                    }

                });

    }

    public ChannelFuture connect() { // Connect to the specified address and port

        ChannelFuture connect = clientBootstrap.connect(host, port);

        connect.awaitUninterruptibly();

        return connect;

    }

    public void close() {

        group.shutdownGracefully();

    }

}

From the code of DemoRpcClient, we can see that the execution order of its ChannelHandlers is as follows:

Lark20200904-143159.png

Client ChannelHandler structure

In addition, when creating the EventLoopGroup, we don’t use NioEventLoopGroup directly, but choose based on the current operating system in NettyEventLoopFactory. For Linux systems, EpollEventLoopGroup is used, while other systems use NioEventLoopGroup.

Next, let’s take a look at the implementation of DemoRpcServer:

public class DemoRpcServer {

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ServerBootstrap serverBootstrap;

    private Channel channel;

    protected int port;

    public DemoRpcServer(int port) throws InterruptedException {

        this.port = port;

        // Create boos and worker EventLoopGroups, pay attention to some details,

        // workerGroup calculates the number of threads based on the number of CPU cores,

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "boos");

        workerGroup = NettyEventLoopFactory.eventLoopGroup( 

            Math.min(Runtime.getRuntime().availableProcessors() + 1,

                 32), "worker");

        serverBootstrap = new ServerBootstrap().group(bossGroup, 

                    workerGroup).channel(NioServerSocketChannel.class)

                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)

                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

                .handler(new LoggingHandler(LogLevel.INFO))

                .childHandler(new ChannelInitializer<SocketChannel>()

                  { // Specify the ChannelHandlers and their order to be registered on each Channel

                    protected void initChannel(SocketChannel ch) {

                       ch.pipeline().addLast("demp-rpc-decoder", 

                            new DemoRpcDecoder());

                       ch.pipeline().addLast("demo-rpc-encoder", 

                            new DemoRpcEncoder());

                       ch.pipeline().addLast("server-handler", 

                            new DemoRpcServerHandler());

                     }

         });

    }

    public ChannelFuture start() throws InterruptedException {

        ChannelFuture channelFuture = serverBootstrap.bind(port);

        channel = channelFuture.channel();

        channel.closeFuture();

        return channelFuture;

    }

}

By analyzing the implementation of DemoRpcServer, we can know that the order of ChannelHandlers on each Channel is as follows:

Lark20200904-143204.png

Server ChannelHandler structure

Implementation of the registry #

After introducing the communication between the client and the server, let’s take a look at another fundamental capability of the simple RPC framework - service registration and discovery, which corresponds to the registry package in the demo-rpc project.

The registry package mainly depends on Apache Curator to implement a simplified version of the ZooKeeper client, and based on ZooKeeper, it implements the two basic functions of the registry: provider registration and consumer subscription.

Here we first define a Registry interface, which provides methods for service registration and querying instances, as shown in the following diagram:

Drawing 3.png

ZooKeeperRegistry is an implementation of the Registry interface based on curator-x-discovery. It encapsulates the ServiceDiscovery introduced earlier and adds a ServiceCache on top of it to improve query efficiency. The specific implementation of ZooKeeperRegistry is as follows: public class ZookeeperRegistry implements Registry {

    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);
    private ServiceDiscovery<T> serviceDiscovery;
    private ServiceCache<T> serviceCache;
    private String address = "localhost:2181";

    public void start() throws Exception {
        String root = "/demo/rpc";

        // Initialize CuratorFramework
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
        client.start(); // Start Curator client
        client.blockUntilConnected();  // Block the current thread until connected
        client.createContainers(root);

        // Initialize ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class)
                .client(client).basePath(root)
                .serializer(serializer)
                .build();
        serviceDiscovery.start(); // Start ServiceDiscovery

        // Create ServiceCache to monitor the changes of corresponding nodes in ZooKeeper and facilitate subsequent reads
        serviceCache = serviceDiscovery.serviceCacheBuilder()
                .name(root)
                .build();
        serviceCache.start(); // Start ServiceCache
    }

    @Override
    public void registerService(ServiceInstance<T> service) throws Exception {
        serviceDiscovery.registerService(service);
    }

    @Override
    public void unregisterService(ServiceInstance service) throws Exception {
        serviceDiscovery.unregisterService(service);
    }

    @Override
    public List<ServiceInstance<T>> queryForInstances(String name) throws Exception {
        // Filter the cached data in ServiceCache directly based on name
        return serviceCache.getInstances().stream()
                .filter(s -> s.getName().equals(name))
                .collect(Collectors.toList());
    }
}

By analyzing the ZookeeperRegistry, it can be inferred that it interacts with ZooKeeper through the ServiceDiscovery component in Curator. The implementation of the Registry interface is also achieved by directly calling the relevant methods of ServiceDiscovery. When querying, it directly reads the cached data in ServiceCache. ServiceCache maintains a ConcurrentHashMap cache locally, and listens to the changes of each child node in ZooKeeper through PathChildrenCache, and synchronously updates the local cache. Let’s take a look at the core implementation of ServiceCache:

public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener {
    // Associated ServiceDiscovery instance
    private final ServiceDiscoveryImpl<T> discovery;
    // Underlying PathChildrenCache for listening to changes in child nodes
    private final PathChildrenCache cache; 
    // Local cache
    private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();

    public List<ServiceInstance<T>> getInstances() {
        // Return the content of the local cache
        return Lists.newArrayList(instances.values());
    }

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
        switch(event.getType()) {
            case CHILD_ADDED:
            case CHILD_UPDATED: {
                addInstance(event.getData(), false); // Update the local cache
                notifyListeners = true;
                break;
            }
            case CHILD_REMOVED: { // Update the local cache
                instances.remove(instanceIdFromData(event.getData()));
                notifyListeners = true;
                break;
            }
        }
        // Notify the listeners registered on ServiceCache
    }
}

Proxy implementation #

In the simplified version of the Demo RPC framework, the Proxy is mainly used to create a proxy for the Client side, which helps the client program shield the underlying network operations and interactions with the registry center.

In the simplified version of Demo RPC, JDK dynamic proxy is used to generate the proxy. Here, an implementation of the InvocationHandler interface needs to be written, which is DemoRpcProxy in the code. There are two core methods in DemoRpcProxy: newInstance() method, used to generate the proxy object; invoke() method, when the target object is called, the proxy logic in the invoke() method will be executed.

Here is the implementation of DemoRpcProxy:

public class DemoRpcProxy implements InvocationHandler {
    // Name of the service (interface) to be proxied
    private String serviceName;
    // Used for interaction with ZooKeeper, with built-in cache
    private Registry<ServerInfo> registry;

    public DemoRpcProxy(String serviceName, Registry<ServerInfo> registry) throws Exception {
        // Initialize the above two fields
        this.serviceName = serviceName;
        this.registry = registry;
    }

    public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {
        // Create the proxy object
        return (T) Proxy.newProxyInstance(Thread.currentThread()
[cut off]
.getContextClassLoader(), new Class[]{clazz}, 
    
    new DemoRpcProxy(clazz.getName(), registry)); 
    
} 
    
@Override 
  
public Object invoke(Object proxy, Method method, Object[] args) 
    
       throws Throwable { 
    
    // 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个 
    
    List<ServiceInstance<ServerInfo>> serviceInstances = 
    
          registry.queryForInstances(serviceName); 
    
    ServiceInstance<ServerInfo> serviceInstance = serviceInstances 
    
        .get(ThreadLocalRandom.current() 
    
            .nextInt(serviceInstances.size())); 
    
    // 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端 
    
    String methodName = method.getName(); 
    
    Header header =new Header(MAGIC, VERSION_1...); 
    
    Message<Request> message = new Message(header, 
    
        new Request(serviceName, methodName, args)); 
    
    return remoteCall(serviceInstance.getPayload(), message); 
    
} 
    
protected Object remoteCall(ServerInfo serverInfo, 
    
        Message message) throws Exception { 
    
    if (serverInfo == null) { 
    
        throw new RuntimeException("get available server error"); 
    
    } 
    
    // 创建DemoRpcClient连接指定的Server端 
    
    DemoRpcClient demoRpcClient = new DemoRpcClient( 
    
          serverInfo.getHost(), serverInfo.getPort()); 
    
    ChannelFuture channelFuture = demoRpcClient.connect() 
    
          .awaitUninterruptibly(); 
    
    // 创建对应的Connection对象,并发送请求 
    
    Connection connection = new Connection(channelFuture, true); 
    
    NettyResponseFuture responseFuture = 
    
         connection.request(message, Constants.DEFAULT_TIMEOUT); 
    
    // 等待请求对应的响应 
    
    return responseFuture.getPromise().get( 
    
        Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); 
    
} 
    
}

From the implementation of the DemoRpcProxy, we can see that it depends on ServiceInstanceCache to get the Server address registered in ZooKeeper, and it also relies on DemoRpcClient to communicate with the Server. After obtaining the proxy object, the upper-level client can call it as if it were a local method, without having to worry about the details of underlying network communication and service discovery. Of course, there are many places in this simplified version of DemoRpcProxy that can be optimized, such as:

  • Caching the DemoRpcClient client object and the corresponding Connection object, so that they don’t need to be created every time.
  • Adding a failure retry mechanism to retry the request in case of a timeout.
  • Adding more complex and flexible load balancing mechanisms, such as load balancing based on hash value scattering, load balancing based on node load, etc.

If you are interested, you can try to extend it to implement a more complete proxy layer.

Integration with Clients #

After introducing the implementation of Demo RPC, let’s talk about how to use the Demo RPC framework. This involves three classes: Consumer, DemoServiceImpl, and Provider, as well as the DemoService business interface.

Drawing 4.png

Classes used for integration

First, we define the DemoService interface as the Server-side business interface. The specific definition is as follows:

public interface DemoService {

    String sayHello(String param);

}

The implementation of DemoServiceImpl for the DemoService interface is also very simple. It modifies the parameter and returns it as the result:

public class DemoServiceImpl implements DemoService {

    public String sayHello(String param) {

        return "hello:" + param;

    }

}

After understanding the corresponding business interface and implementation, let’s take a look at the implementation of Provider. It plays a similar role to the Provider in Dubbo. It creates a DemoServiceImpl business bean and exposes its own address information. The code is shown below:

public class Provider {

    public static void main(String[] args) throws Exception {

        // Create DemoServiceImpl and register it in BeanManager

        BeanManager.registerBean("demoService", 

                new DemoServiceImpl());

        // Create ZookeeperRegistry and register the address information of the Provider as ServerInfo to ZooKeeper

        ZookeeperRegistry<ServerInfo> discovery = 

                new ZookeeperRegistry<>();

        discovery.start();

        ServerInfo serverInfo = new ServerInfo("127.0.0.1", 20880);

        discovery.registerService(

             ServiceInstance.<ServerInfo>builder().name("demoService")

                .payload(serverInfo).build());

        // Start DemoRpcServer and wait for client requests

        DemoRpcServer rpcServer = new DemoRpcServer(20880);

        rpcServer.start();

    }

}

Finally, let’s talk about the consumer. It is similar to the Consumer in Dubbo. It subscribes to the address information of the Providers, then selects a Provider, establishes a connection, sends a request, and gets the response. These processes are encapsulated in the Proxy. The implementation of the Consumer is very simple, as shown in the example code below:

public class Consumer {

    public static void main(String[] args) throws Exception {

        // Create ZookeeperRegistry object

        ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();

        // Create proxy object and invoke the remote Server through the proxy

        DemoService demoService = DemoRpcProxy.newInstance(DemoService.class, discovery);

        // Invoke the sayHello() method and print the result

        String result = demoService.sayHello("hello");

        System.out.println(result);

    }

}

Conclusion #

In this lesson, we first introduced the transport package in the simplified RPC framework. It implements the communication capabilities of the Server and the client on top of the codec introduced in the previous lesson. Then, we explained how the registry package interacts with ZooKeeper and enhanced the capability of the simplified RPC framework for service registration and discovery. Next, we analyzed the implementation of the proxy package, where the complexity of underlying network communication is abstracted away by using JDK dynamic proxy. Finally, we wrote a simple DemoService business interface, as well as the corresponding Provider and Consumer integration with the simplified RPC framework.

At the end of this lesson, I have a little question for you: why does the creation of the EventLoopGroup in the transport package differ for Linux systems? I’m looking forward to your comments.

Link to the simplified RPC framework Demo: https://github.com/xxxlxy2008/demo-prc.