12 Simple RPC Framework Implementation Below #
In the previous lesson, we introduced the structure and working principle of the entire simple RPC framework project, as well as the specific implementation of the underlying protocol structure, serialization/deserialization, compression, and codec. In this lesson, we will continue to introduce the remaining parts of the simple RPC framework implementation from bottom to top.
Implementation of transport module #
As mentioned earlier when introducing the Netty thread model, we cannot perform time-consuming business logic in Netty’s I/O thread. In the Demo RPC framework, when the server side receives a request, it will first deserialize the request message using the DemoRpcDecoder we introduced earlier. Then, we will submit the request to a business thread pool for processing using a custom ChannelHandler (DemoRpcServerHandler).
When the client side of the Demo RPC framework receives a response message, it also deserializes the response message using DemoRpcDecoder, and then returns the response to the upper-level business using a custom ChannelHandler (DemoRpcClientHandler).
Both DemoRpcServerHandler and DemoRpcClientHandler inherit from SimpleChannelInboundHandler, as shown in the diagram below:
Inheritance diagram of DemoRpcClientHandler and DemoRpcServerHandler
Now let’s take a look at the implementation of these two custom ChannelHandlers:
public class DemoRpcServerHandler extends SimpleChannelInboundHandler<Message<Request>> {
// Business thread pool
static Executor executor = Executors.newCachedThreadPool();
protected void channelRead0(final ChannelHandlerContext ctx, Message<Request> message) throws Exception {
byte extraInfo = message.getHeader().getExtraInfo();
if (Constants.isHeartBeat(extraInfo)) { // Heartbeat message, return directly
channelHandlerContext.writeAndFlush(message);
return;
}
// Non-heartbeat message, encapsulate it as a Runnable and submit it to the business thread
executor.execute(new InvokeRunnable(message, cxt));
}
}
public class DemoRpcClientHandler extends SimpleChannelInboundHandler<Message<Response>> {
protected void channelRead0(ChannelHandlerContext ctx, Message<Response> message) throws Exception {
NettyResponseFuture responseFuture = Connection.IN_FLIGHT_REQUEST_MAP.remove(message.getHeader().getMessageId());
Response response = message.getContent();
// Special handling for heartbeat messages
if (response == null && Constants.isHeartBeat(message.getHeader().getExtraInfo())) {
response = new Response();
response.setCode(Constants.HEARTBEAT_CODE);
}
responseFuture.getPromise().setSuccess(response);
}
}
Note that there are two points that need to be explained. The first point is the InvokeRunnable on the server side. In this Runnable task, we will call the corresponding method based on the serviceName, methodName, and parameter information of the request:
class InvokeRunnable implements Runnable {
private ChannelHandlerContext ctx;
private Message<Request> message;
public void run() {
Response response = new Response();
Object result = null;
try {
Request request = message.getContent();
String serviceName = request.getServiceName();
// Here, BeanManager is used to manage all the business beans. Underlying, it maintains a collection of business bean instances in memory. Interested students can try integrating Spring and other container to manage business beans.
Object bean = BeanManager.getBean(serviceName);
// The following code uses reflection to invoke the corresponding method in the bean
Method method = bean.getClass().getMethod(request.getMethodName(), request.getArgTypes());
result = method.invoke(bean, request.getArgs());
} catch (Exception e) {
// Omitted exception handling
} finally {
}
response.setResult(result); // Set the response result
// Return the response message to the client
ctx.writeAndFlush(new Message(message.getHeader(), response));
}
}
The second point is the Connection on the client side. It is used to temporarily store the requests that have been sent out but have not received a response. Therefore, when a response is returned, we can find the corresponding request and future, and return the response result to the upper-level business logic. The specific implementation is as follows:
public class Connection implements Closeable {
private static AtomicLong ID_GENERATOR = new AtomicLong(0);
public static Map<Long, NettyResponseFuture<Response>> IN_FLIGHT_REQUEST_MAP = new ConcurrentHashMap<>();
private ChannelFuture future;
private AtomicBoolean isConnected = new AtomicBoolean();
public Connection(ChannelFuture future, boolean isConnected) {
this.future = future;
this.isConnected.set(isConnected);
}
public NettyResponseFuture<Response> request(Message<Request> message, long timeOut) {
// Generate and set the message ID
long messageId = ID_GENERATOR.incrementAndGet();
message.getHeader().setMessageId(messageId);
// Create a future associated with the message
NettyResponseFuture responseFuture = new NettyResponseFuture(System.currentTimeMillis(), timeOut, message, future.channel(), new DefaultPromise(new DefaultEventLoop()));
// Record the message ID and the associated Future in the IN_FLIGHT_REQUEST_MAP collection
IN_FLIGHT_REQUEST_MAP.put(messageId, responseFuture);
try {
future.channel().writeAndFlush(message); // Send the request
} catch (Exception e) {
// When an exception occurs while sending the request, remove the corresponding Future
IN_FLIGHT_REQUEST_MAP.remove(messageId);
throw e;
}
return responseFuture;
}
// Omitted getter/setter and close() methods
}
We can see that there is no operation in the Connection class to clean the IN_FLIGHT_REQUEST_MAP collection. When it fails to get a response, the IN_FLIGHT_REQUEST_MAP will keep growing and eventually cause an OOM. You can add a timer to clean up expired request messages periodically, but we won’t go into detail here.
After writing the custom ChannelHandler, we need to define two more classes: DemoRpcClient and DemoRpcServer, which serve as the entry points for the Client and Server, respectively. The implementation of DemoRpcClient is as follows:
public class DemoRpcClient implements Closeable {
protected Bootstrap clientBootstrap;
protected EventLoopGroup group;
private String host;
private int port;
public DemoRpcClient(String host, int port) throws Exception {
this.host = host;
this.port = port;
clientBootstrap = new Bootstrap();
// Create and configure the clientBootstrap
group = NettyEventLoopFactory.eventLoopGroup(
Constants.DEFAULT_IO_THREADS, "NettyClientWorker");
clientBootstrap.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class)
// Specify the order of the ChannelHandler
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast("demo-rpc-encoder",
new DemoRpcEncoder());
ch.pipeline().addLast("demo-rpc-decoder",
new DemoRpcDecoder());
ch.pipeline().addLast("client-handler",
new DemoRpcClientHandler());
}
});
}
public ChannelFuture connect() { // Connect to the specified address and port
ChannelFuture connect = clientBootstrap.connect(host, port);
connect.awaitUninterruptibly();
return connect;
}
public void close() {
group.shutdownGracefully();
}
}
From the code of DemoRpcClient, we can see that the execution order of its ChannelHandlers is as follows:
Client ChannelHandler structure
In addition, when creating the EventLoopGroup, we don’t use NioEventLoopGroup directly, but choose based on the current operating system in NettyEventLoopFactory. For Linux systems, EpollEventLoopGroup is used, while other systems use NioEventLoopGroup.
Next, let’s take a look at the implementation of DemoRpcServer:
public class DemoRpcServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap serverBootstrap;
private Channel channel;
protected int port;
public DemoRpcServer(int port) throws InterruptedException {
this.port = port;
// Create boos and worker EventLoopGroups, pay attention to some details,
// workerGroup calculates the number of threads based on the number of CPU cores,
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "boos");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
Math.min(Runtime.getRuntime().availableProcessors() + 1,
32), "worker");
serverBootstrap = new ServerBootstrap().group(bossGroup,
workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
{ // Specify the ChannelHandlers and their order to be registered on each Channel
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast("demp-rpc-decoder",
new DemoRpcDecoder());
ch.pipeline().addLast("demo-rpc-encoder",
new DemoRpcEncoder());
ch.pipeline().addLast("server-handler",
new DemoRpcServerHandler());
}
});
}
public ChannelFuture start() throws InterruptedException {
ChannelFuture channelFuture = serverBootstrap.bind(port);
channel = channelFuture.channel();
channel.closeFuture();
return channelFuture;
}
}
By analyzing the implementation of DemoRpcServer, we can know that the order of ChannelHandlers on each Channel is as follows:
Server ChannelHandler structure
Implementation of the registry #
After introducing the communication between the client and the server, let’s take a look at another fundamental capability of the simple RPC framework - service registration and discovery, which corresponds to the registry package in the demo-rpc project.
The registry package mainly depends on Apache Curator to implement a simplified version of the ZooKeeper client, and based on ZooKeeper, it implements the two basic functions of the registry: provider registration and consumer subscription.
Here we first define a Registry interface, which provides methods for service registration and querying instances, as shown in the following diagram:
ZooKeeperRegistry is an implementation of the Registry interface based on curator-x-discovery. It encapsulates the ServiceDiscovery introduced earlier and adds a ServiceCache on top of it to improve query efficiency. The specific implementation of ZooKeeperRegistry is as follows: public class ZookeeperRegistry implements Registry {
private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);
private ServiceDiscovery<T> serviceDiscovery;
private ServiceCache<T> serviceCache;
private String address = "localhost:2181";
public void start() throws Exception {
String root = "/demo/rpc";
// Initialize CuratorFramework
CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
client.start(); // Start Curator client
client.blockUntilConnected(); // Block the current thread until connected
client.createContainers(root);
// Initialize ServiceDiscovery
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class)
.client(client).basePath(root)
.serializer(serializer)
.build();
serviceDiscovery.start(); // Start ServiceDiscovery
// Create ServiceCache to monitor the changes of corresponding nodes in ZooKeeper and facilitate subsequent reads
serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(root)
.build();
serviceCache.start(); // Start ServiceCache
}
@Override
public void registerService(ServiceInstance<T> service) throws Exception {
serviceDiscovery.registerService(service);
}
@Override
public void unregisterService(ServiceInstance service) throws Exception {
serviceDiscovery.unregisterService(service);
}
@Override
public List<ServiceInstance<T>> queryForInstances(String name) throws Exception {
// Filter the cached data in ServiceCache directly based on name
return serviceCache.getInstances().stream()
.filter(s -> s.getName().equals(name))
.collect(Collectors.toList());
}
}
By analyzing the ZookeeperRegistry
, it can be inferred that it interacts with ZooKeeper through the ServiceDiscovery
component in Curator. The implementation of the Registry
interface is also achieved by directly calling the relevant methods of ServiceDiscovery
. When querying, it directly reads the cached data in ServiceCache
. ServiceCache
maintains a ConcurrentHashMap
cache locally, and listens to the changes of each child node in ZooKeeper through PathChildrenCache
, and synchronously updates the local cache. Let’s take a look at the core implementation of ServiceCache
:
public class ServiceCacheImpl<T> implements ServiceCache<T>, PathChildrenCacheListener {
// Associated ServiceDiscovery instance
private final ServiceDiscoveryImpl<T> discovery;
// Underlying PathChildrenCache for listening to changes in child nodes
private final PathChildrenCache cache;
// Local cache
private final ConcurrentMap<String, ServiceInstance<T>> instances = Maps.newConcurrentMap();
public List<ServiceInstance<T>> getInstances() {
// Return the content of the local cache
return Lists.newArrayList(instances.values());
}
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch(event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED: {
addInstance(event.getData(), false); // Update the local cache
notifyListeners = true;
break;
}
case CHILD_REMOVED: { // Update the local cache
instances.remove(instanceIdFromData(event.getData()));
notifyListeners = true;
break;
}
}
// Notify the listeners registered on ServiceCache
}
}
Proxy implementation #
In the simplified version of the Demo RPC framework, the Proxy is mainly used to create a proxy for the Client side, which helps the client program shield the underlying network operations and interactions with the registry center.
In the simplified version of Demo RPC, JDK dynamic proxy is used to generate the proxy. Here, an implementation of the InvocationHandler interface needs to be written, which is DemoRpcProxy in the code. There are two core methods in DemoRpcProxy: newInstance()
method, used to generate the proxy object; invoke()
method, when the target object is called, the proxy logic in the invoke()
method will be executed.
Here is the implementation of DemoRpcProxy:
public class DemoRpcProxy implements InvocationHandler {
// Name of the service (interface) to be proxied
private String serviceName;
// Used for interaction with ZooKeeper, with built-in cache
private Registry<ServerInfo> registry;
public DemoRpcProxy(String serviceName, Registry<ServerInfo> registry) throws Exception {
// Initialize the above two fields
this.serviceName = serviceName;
this.registry = registry;
}
public static <T> T newInstance(Class<T> clazz, Registry<ServerInfo> registry) throws Exception {
// Create the proxy object
return (T) Proxy.newProxyInstance(Thread.currentThread()
[cut off]
.getContextClassLoader(), new Class[]{clazz},
new DemoRpcProxy(clazz.getName(), registry));
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
// 从Zookeeper缓存中获取可用的Server地址,并随机从中选择一个
List<ServiceInstance<ServerInfo>> serviceInstances =
registry.queryForInstances(serviceName);
ServiceInstance<ServerInfo> serviceInstance = serviceInstances
.get(ThreadLocalRandom.current()
.nextInt(serviceInstances.size()));
// 创建请求消息,然后调用remoteCall()方法请求上面选定的Server端
String methodName = method.getName();
Header header =new Header(MAGIC, VERSION_1...);
Message<Request> message = new Message(header,
new Request(serviceName, methodName, args));
return remoteCall(serviceInstance.getPayload(), message);
}
protected Object remoteCall(ServerInfo serverInfo,
Message message) throws Exception {
if (serverInfo == null) {
throw new RuntimeException("get available server error");
}
// 创建DemoRpcClient连接指定的Server端
DemoRpcClient demoRpcClient = new DemoRpcClient(
serverInfo.getHost(), serverInfo.getPort());
ChannelFuture channelFuture = demoRpcClient.connect()
.awaitUninterruptibly();
// 创建对应的Connection对象,并发送请求
Connection connection = new Connection(channelFuture, true);
NettyResponseFuture responseFuture =
connection.request(message, Constants.DEFAULT_TIMEOUT);
// 等待请求对应的响应
return responseFuture.getPromise().get(
Constants.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
From the implementation of the DemoRpcProxy
, we can see that it depends on ServiceInstanceCache
to get the Server address registered in ZooKeeper, and it also relies on DemoRpcClient
to communicate with the Server. After obtaining the proxy object, the upper-level client can call it as if it were a local method, without having to worry about the details of underlying network communication and service discovery. Of course, there are many places in this simplified version of DemoRpcProxy
that can be optimized, such as:
- Caching the
DemoRpcClient
client object and the correspondingConnection
object, so that they don’t need to be created every time. - Adding a failure retry mechanism to retry the request in case of a timeout.
- Adding more complex and flexible load balancing mechanisms, such as load balancing based on hash value scattering, load balancing based on node load, etc.
If you are interested, you can try to extend it to implement a more complete proxy layer.
Integration with Clients #
After introducing the implementation of Demo RPC, let’s talk about how to use the Demo RPC framework. This involves three classes: Consumer
, DemoServiceImpl
, and Provider
, as well as the DemoService
business interface.
Classes used for integration
First, we define the DemoService
interface as the Server-side business interface. The specific definition is as follows:
public interface DemoService {
String sayHello(String param);
}
The implementation of DemoServiceImpl
for the DemoService
interface is also very simple. It modifies the parameter and returns it as the result:
public class DemoServiceImpl implements DemoService {
public String sayHello(String param) {
return "hello:" + param;
}
}
After understanding the corresponding business interface and implementation, let’s take a look at the implementation of Provider
. It plays a similar role to the Provider in Dubbo. It creates a DemoServiceImpl
business bean and exposes its own address information. The code is shown below:
public class Provider {
public static void main(String[] args) throws Exception {
// Create DemoServiceImpl and register it in BeanManager
BeanManager.registerBean("demoService",
new DemoServiceImpl());
// Create ZookeeperRegistry and register the address information of the Provider as ServerInfo to ZooKeeper
ZookeeperRegistry<ServerInfo> discovery =
new ZookeeperRegistry<>();
discovery.start();
ServerInfo serverInfo = new ServerInfo("127.0.0.1", 20880);
discovery.registerService(
ServiceInstance.<ServerInfo>builder().name("demoService")
.payload(serverInfo).build());
// Start DemoRpcServer and wait for client requests
DemoRpcServer rpcServer = new DemoRpcServer(20880);
rpcServer.start();
}
}
Finally, let’s talk about the consumer. It is similar to the Consumer in Dubbo. It subscribes to the address information of the Providers, then selects a Provider, establishes a connection, sends a request, and gets the response. These processes are encapsulated in the Proxy. The implementation of the Consumer is very simple, as shown in the example code below:
public class Consumer {
public static void main(String[] args) throws Exception {
// Create ZookeeperRegistry object
ZookeeperRegistry<ServerInfo> discovery = new ZookeeperRegistry<>();
// Create proxy object and invoke the remote Server through the proxy
DemoService demoService = DemoRpcProxy.newInstance(DemoService.class, discovery);
// Invoke the sayHello() method and print the result
String result = demoService.sayHello("hello");
System.out.println(result);
}
}
Conclusion #
In this lesson, we first introduced the transport
package in the simplified RPC framework. It implements the communication capabilities of the Server and the client on top of the codec introduced in the previous lesson. Then, we explained how the registry
package interacts with ZooKeeper and enhanced the capability of the simplified RPC framework for service registration and discovery. Next, we analyzed the implementation of the proxy
package, where the complexity of underlying network communication is abstracted away by using JDK dynamic proxy. Finally, we wrote a simple DemoService
business interface, as well as the corresponding Provider
and Consumer
integration with the simplified RPC framework.
At the end of this lesson, I have a little question for you: why does the creation of the EventLoopGroup
in the transport
package differ for Linux systems? I’m looking forward to your comments.
Link to the simplified RPC framework Demo: https://github.com/xxxlxy2008/demo-prc.