27 Dynamic Proxy, Shielding Users From the Underlying Details of Rpc Invocation

27 Dynamic proxy, shielding users from the underlying details of RPC invocation #

Dynamic proxy plays a crucial role in the implementation of RPC frameworks. It helps shield users from the specific details of RPC invocation, such as underlying network communication, service discovery, load balancing, etc., which are not meaningful to users. When using an RPC framework in your project development, you only need to invoke the interface methods and obtain the return results. Have you ever wondered how the RPC framework completes the entire invocation process? In today’s lesson, we will complete the last part of the RPC framework: RPC request invocation and processing, and see how to use the dynamic proxy mechanism to achieve this magical operation.

Source code reference: mini-rpc

Basics of Dynamic Proxy #

Why do we need the proxy pattern? The advantage of the proxy pattern is that it can follow the open-closed principle in design patterns, allowing for extension and closing for modification. You don’t need to pay attention to the implementation details of the target class. With the proxy pattern, you can enhance the functionality of the target class without modifying it. Spring AOP is a classic application of Java dynamic proxy mechanism. We often use AOP technology to complete some aspect services in project development, such as time-consuming monitoring, transaction management, permission verification, etc. All operations are implemented through aspect extension without invasive changes to the source code.

Dynamic proxy is a proxy pattern that provides a mechanism for dynamically constructing proxy classes and dynamically invoking target methods at runtime. It is called dynamic because the relationship between the proxy class and the object being proxied is determined at runtime. The proxy class can be regarded as a wrapper for the object being proxied, and the invocation of the target method is accomplished through the proxy class. Therefore, the proxy pattern can effectively decouple the service provider and service consumer, hiding the specific details of RPC invocation, as shown in the following figure.

Image 1

Next, let’s explore the implementation principle of dynamic proxy and how the commonly used JDK dynamic proxy and Cglib dynamic proxy are used.

JDK Dynamic Proxy #

The implementation of JDK dynamic proxy relies on two core classes in the java.lang.reflect package: InvocationHandler interface and Proxy class.

  • InvocationHandler Interface

The object being proxied by JDK dynamic proxy must implement one or more interfaces. The generated proxy class is also an implementation class of the interface. JDK dynamic proxy relies on reflection to invoke methods in the proxy class, so it cannot proxy methods that do not exist in the interface. Each dynamic proxy object must provide an implementation class of the InvocationHandler interface. The InvocationHandler interface has only one invoke() method. When we invoke a method on the proxy object, it will ultimately be forwarded to the invoke() method to execute the specific logic. The definition of the invoke() method is as follows:

public interface InvocationHandler {

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;

}

The proxy parameter represents the object to be proxied, the method parameter represents the method called on the proxy object, and the args parameter represents the arguments needed for the called method.

  • Proxy Class

The Proxy class can be understood as a factory class that dynamically creates proxy classes. It provides a set of static methods and interfaces for dynamically generating objects and proxy classes. Usually, we only need to use the newProxyInstance() method, whose definition is as follows:

public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) {

    Objects.requireNonNull(h);

    Class<?> caller = System.getSecurityManager() == null ? null : Reflection.getCallerClass();

    Constructor<?> cons = getProxyConstructor(caller, loader, interfaces);

    return newProxyInstance(caller, cons, h);

}

The loader parameter represents the class loader to be used, the interfaces parameter represents the list of interfaces implemented by the proxy class, and you need to provide an implementation of the InvocationHandler interface as a handler. All method invocations of the dynamic proxy class will be handled by this handler, which is the core of dynamic proxy.

Next, let’s use a simple example to simulate transaction management of database operations and learn about the specific usage of JDK dynamic proxy. First, we define the interface and implementation class of the User table in the database:

public interface UserDao {

    void insert();

}

public class UserDaoImpl implements UserDao {

    @Override

    public void insert() {

        System.out.println("insert user success.");

    }

}

Next, we implement a transaction management utility class to perform transaction operations before and after executing database operations. The code is as follows:

public class TransactionProxy {

    private Object target;

    public TransactionProxy(Object target) {

        this.target = target;

    }

    public Object genProxyInstance() {

        return Proxy.newProxyInstance(target.getClass().getClassLoader(),

                target.getClass().getInterfaces(),

                (proxy, method, args) -> {

                    System.out.println("start transaction");

                    Object result = method.invoke(target, args);

                    System.out.println("submit transaction");

                    return result;

                });

    }

}

In the genProxyInstance() method, the most important thing is to implement the InvocationHandler interface. Custom behaviors can be extended before and after the execution of methods in the real object to enhance the functionality of the target class. For easier understanding, in the above example, we only print simple console logs. You can see the actual effect of JDK dynamic proxy through the test class:

public class TransactionProxyTest {

    @Test

    public void testProxy() {

        UserDao userDao = new UserDaoImpl();

        UserDao proxyInstance = (UserDao) new TransactionProxy(userDao).genProxyInstance();
            proxyInstance.insert();
    
        }
    
    }
    

The program output is as follows:

```java
start transaction

insert user success.

submit transaction

Cglib Dynamic Proxy #

Cglib dynamic proxy is a third-party utility library based on the ASM bytecode generation framework. Compared to JDK dynamic proxy, Cglib dynamic proxy is more flexible. It generates a proxy class using bytecode technology, so the type of the proxy class is not restricted. The target class that is used with Cglib proxy does not need to implement any interfaces, making it non-intrusive.

Cglib dynamic proxy generates a subclass of a specified class using bytecode and overrides its methods to achieve dynamic proxy. Since Cglib dynamic proxy creates a subclass of the target class, the target class must have a parameterless constructor and should not be declared as final.

Before using Cglib dynamic proxy, we need to import the necessary Maven dependencies as shown below. If your project has already imported the dependencies for spring-core, then the dependencies for Cglib are already included and there is no need to import them again.

<dependency>
    <groupId>cglib</groupId>
    <artifactId>cglib</artifactId>
    <version>3.3.0</version>
</dependency>

Next, let’s use the example of database transaction management to learn about the specific usage of JDK dynamic proxy. The UserDao interface and its implementation remain unchanged, but the TransactionProxy class needs to be implemented again, as shown below:

public class CglibTransactionProxy implements MethodInterceptor {

    private Object target;

    public CglibTransactionProxy(Object target) {
        this.target = target;
    }

    public Object genProxyInstance() {
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(target.getClass());
        enhancer.setCallback(this);
        return enhancer.create();
    }

    @Override
    public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        System.out.println("start transaction");
        Object result = methodProxy.invokeSuper(object, args);
        System.out.println("submit transaction");
        return result;
    }

}

The implementation of Cglib dynamic proxy requires two core components: the MethodInterceptor interface and the Enhancer class, similar to the InvocationHandler interface and the Proxy class in JDK dynamic proxy.

  • MethodInterceptor interface

The MethodInterceptor interface has only one method, intercept(). All methods of the proxy class will be ultimately redirected to the intercept() method for behavior enhancement. The actual execution logic of the real methods is invoked through the Method or MethodProxy object.

  • Enhancer class

The Enhancer class is a bytecode enhancer in Cglib, which provides great convenience for extending the proxy class. The Essence of the Enhancer class is dynamically generating a subclass of the proxy class at runtime and intercepting all methods in the proxy class. We can set up a Callback interface through Enhancer to perform custom behaviors before and after the execution of the proxy class methods. The MethodInterceptor interface is our most commonly used Callback operation.

The test class for Cglib dynamic proxy is similar to the test class for JDK dynamic proxy, and the program output is the same. The code for the test class is as follows:

public class CglibTransactionProxyTest {

    public static void main(String[] args) {
        UserDao userDao = new UserDaoImpl();
        UserDao proxyInstance = (UserDao) new CglibTransactionProxy(userDao).genProxyInstance();
        proxyInstance.insert();
    }

}

After learning the basics of dynamic proxy, we can easily implement the request invocation and processing in the RPC framework. First, let’s start with how the service consumer initiates an RPC request using dynamic proxy.

Service Consumer Dynamic Proxy Implementation #

In the “Service Publishing and Subscribing: Building the Basic Framework for Producers and Consumers” course, we explained the implementation of the @RpcReference annotation. Through a custom RpcReferenceBean, all method invocations are intercepted. The init() method in RpcReferenceBean was marked as TODO at that time, which is the entry point for creating the proxy object. The creation of the proxy object is as follows:

public class RpcReferenceBean implements FactoryBean<Object> {

    // Other code omitted
    
    public void init() throws Exception {
        RegistryService registryService = RegistryFactory.getInstance(this.registryAddr, RegistryType.valueOf(this.registryType));
        this.object = Proxy.newProxyInstance(
            interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},

new RpcInvokerProxy(serviceVersion, timeout, registryService));

}
// Omit other code

}

The RpcInvokerProxy handler is the core of implementing dynamic proxy logic. It includes the specific details of low-level network communication, service discovery, and load balancing during RPC calls. Let's take a closer look at how the RpcInvokerProxy handler is implemented, the code is as follows:

public class RpcInvokerProxy implements InvocationHandler {

    private final String serviceVersion;

    private final long timeout;

    private final RegistryService registryService;

    public RpcInvokerProxy(String serviceVersion, long timeout, RegistryService registryService) {

        this.serviceVersion = serviceVersion;

        this.timeout = timeout;

        this.registryService = registryService;

    }

    @Override

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // Construct the RPC protocol object

        MiniRpcProtocol<MiniRpcRequest> protocol = new MiniRpcProtocol<>();

        MsgHeader header = new MsgHeader();

        long requestId = MiniRpcRequestHolder.REQUEST_ID_GEN.incrementAndGet();

        header.setMagic(ProtocolConstants.MAGIC);

        header.setVersion(ProtocolConstants.VERSION);

        header.setRequestId(requestId);

        header.setSerialization((byte) SerializationTypeEnum.HESSIAN.getType());

        header.setMsgType((byte) MsgType.REQUEST.getType());

        header.setStatus((byte) 0x1);

        protocol.setHeader(header);

        MiniRpcRequest request = new MiniRpcRequest();

        request.setServiceVersion(this.serviceVersion);

        request.setClassName(method.getDeclaringClass().getName());

        request.setMethodName(method.getName());

        request.setParameterTypes(method.getParameterTypes());

        request.setParams(args);

        protocol.setBody(request);

        RpcConsumer rpcConsumer = new RpcConsumer();

        MiniRpcFuture<MiniRpcResponse> future = new MiniRpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()), timeout);

        MiniRpcRequestHolder.REQUEST_MAP.put(requestId, future);

        // Initiate the RPC remote call

        rpcConsumer.sendRequest(protocol, this.registryService);

        // Wait for the RPC call to execute and get the result

        return future.getPromise().get(future.getTimeout(), TimeUnit.MILLISECONDS).getData();

    }

}

The RpcInvokerProxy handler must implement the invoke() method of the InvocationHandler interface. When the RPC interface being proxied executes a method call, it will be forwarded to the invoke() method. The core process of the invoke() method mainly consists of three steps: constructing the RPC protocol object, initiating the RPC remote call, and waiting for the RPC call to execute and get the result.

To construct the RPC protocol object, simply assign the attributes of the MiniRpcProtocol class in order according to the user-configured interface parameters. After constructing the Mini RpcProtocol protocol object, we can initiate the RPC call to the remote service node, so the sendRequest() method is what we need to focus on implementing.

public void sendRequest(MiniRpcProtocol<MiniRpcRequest> protocol, RegistryService registryService) throws Exception {

    MiniRpcRequest request = protocol.getBody();

    Object[] params = request.getParams();

    String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());

    int invokerHashCode = params.length > 0 ? params[0].hashCode() : serviceKey.hashCode();

    ServiceMeta serviceMetadata = registryService.discovery(serviceKey, invokerHashCode);

    if (serviceMetadata != null) {

        ChannelFuture future = bootstrap.connect(serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort()).sync();

        future.addListener((ChannelFutureListener) arg0 -> {

            if (future.isSuccess()) {

                log.info("connect rpc server {} on port {} success.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());

            } else {

                log.error("connect rpc server {} on port {} failed.", serviceMetadata.getServiceAddr(), serviceMetadata.getServicePort());

                future.cause().printStackTrace();

                eventLoopGroup.shutdownGracefully();

            }

        });
future.channel().writeAndFlush(protocol);

}

Before making an RPC call, we need to find the most suitable service node and directly call the discovery() method of the registry service RegistryService to implement service discovery, which is implemented using the consistent hash algorithm by default. Here is a small trick: in order to evenly distribute the request traffic to all service nodes as much as possible, you need to provide an invokerHashCode for discovery(), generally using the hashCode of the first parameter in the RPC service interface parameter list as a reference. After finding the address of the service node, we establish a TCP connection using Netty and then call the writeAndFlush() method to send the data to the remote service node.

Returning to the main process of the invoke() method, how do we wait for the call result after making a remote RPC call? In the course “Remote Communication: Communication Protocol Design and Encoding/Decoding Implementation,” we introduced how to use the Promise tool provided by Netty to achieve synchronous waiting for RPC requests. The Promise pattern is essentially an asynchronous programming model. We can get a credential for viewing the task execution result without waiting for the task to complete. When we need to get the task execution result, we can use the relevant interface provided by the credential to obtain it.

After the service provider receives an RPC request, how should it execute the real method call? Let’s continue to see how the service provider handles RPC requests.

Implementation of Service Provider Reflective Invocation #

In the course “Remote Communication: Communication Protocol Design and Encoding/Decoding Implementation,” we have already introduced the handler for the service provider. After the RPC request data is decoded into a MiniRpcProtocol object by MiniRpcDecoder, it is then handed over to RpcRequestHandler to execute the RPC request call. Let’s first review the processing logic of the channelRead0() method in RpcRequestHandler:

@Slf4j

public class RpcRequestHandler extends SimpleChannelInboundHandler<MiniRpcProtocol<MiniRpcRequest>> {

    private final Map<String, Object> rpcServiceMap;

    public RpcRequestHandler(Map<String, Object> rpcServiceMap) {

        this.rpcServiceMap = rpcServiceMap;

    }

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocol<MiniRpcRequest> protocol) {

        RpcRequestProcessor.submitRequest(() -> {

            MiniRpcProtocol<MiniRpcResponse> resProtocol = new MiniRpcProtocol<>();

            MiniRpcResponse response = new MiniRpcResponse();

            MsgHeader header = protocol.getHeader();

            header.setMsgType((byte) MsgType.RESPONSE.getType());

            try {

                Object result = handle(protocol.getBody());

                response.setData(result);

                header.setStatus((byte) MsgStatus.SUCCESS.getCode());

                resProtocol.setHeader(header);

                resProtocol.setBody(response);

            } catch (Throwable throwable) {

                header.setStatus((byte) MsgStatus.FAIL.getCode());

                response.setMessage(throwable.toString());

                log.error("process request {} error", header.getRequestId(), throwable);

            }

            ctx.writeAndFlush(resProtocol);

        });

    }

}

Because RPC request calls are time-consuming, the recommended practice is to submit RPC requests to a custom business thread pool for execution. The handle() method is the place where the RPC call is actually performed, and it needs to be implemented in this lesson. The implementation of the handle() method is as follows:

private Object handle(MiniRpcRequest request) throws Throwable {

    String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());

    Object serviceBean = rpcServiceMap.get(serviceKey);

    if (serviceBean == null) {

        throw new RuntimeException(String.format("service not exist: %s:%s", request.getClassName(), request.getMethodName()));

    }

    Class<?> serviceClass = serviceBean.getClass();

    String methodName = request.getMethodName();

    Class<?>[] parameterTypes = request.getParameterTypes();

    Object[] parameters = request.getParams();

    FastClass fastClass = FastClass.create(serviceClass);

    int methodIndex = fastClass.getIndex(methodName, parameterTypes);

    return fastClass.invoke(methodIndex, serviceBean, parameters);

}

The rpcServiceMap stores all the service interfaces published by the service provider, and we can find the corresponding service interface using the service name and version. With the service interface, method name, method parameter list, and parameter type list, we can generally use reflection to execute method calls. In order to accelerate the performance of service interface invocation, we use the FastClass mechanism provided by Cglib to directly call the method. The MethodProxy object in Cglib uses the FastClass mechanism, which can do the same thing as the Method object, but it has better performance compared to reflection.

The FastClass mechanism does not call the delegate method of the proxy class using reflection, but dynamically generates a new subclass of FastClass at runtime and writes the logic to directly call the target method to the subclass. At the same time, the subclass assigns an int-type index to the proxy class, and FastClass can use the index to locate the method that needs to be called.

At this point, we have completed the prototype of the entire RPC framework. You can first start the ZooKeeper server locally, and then start the rpc-provider and rpc-consumer modules and initiate the test using an HTTP request, as shown below:

$ curl http://localhost:8080/hello

hellomini rpc

Summary #

In this lesson, we introduced the basic principles of dynamic proxy and used dynamic proxy technology to complete the invocation and handling of RPC requests. Dynamic proxy technology is one of the core technologies of RPC frameworks and an important performance optimization point. The choice of which dynamic proxy technology to use depends on the specific scenario. Practice makes perfect, and good performance testing is needed when selecting technologies. For example, after JDK dynamic proxy became faster after JDK 1.8, it is still limited in its use. Although Javassist bytecode generation has better performance compared to JDK dynamic proxy and Cglib dynamic proxy, Javassist is slower in generating dynamic proxy classes.

Here are two tasks for further study:

  • The Dubbo framework uses Javassist as the default implementation for dynamic proxy functionality. You can replace the JDK dynamic proxy with Javassist implementation.
  • The service consumer establishes a TCP connection every time it makes an RPC call. Do you know how to optimize this?