24 Service Publishing and Subscription, Building the Foundation of Producer and Consumer Structures

24 Service publishing and subscription, building the foundation of producer and consumer structures #

Starting from this lesson, we will start developing a complete prototype of an RPC framework. Through the learning of the entire practice course, you will not only become familiar with the implementation principles of RPC, but also deepen your understanding of the previous Netty basics. This knowledge can also be applied to your work.

I will provide detailed explanations of the implementation process of a general RPC framework from four aspects: service publishing and subscribing, remote communication, service governance, and dynamic proxy. I believe that if you persevere and complete this practice course, it will become easier for you to independently develop projects in your work. Are you eager to get started? Let’s start together!

Source code reference: mini-rpc

Environment Setup #

To do a good job, one must first sharpen one’s tools. First, we need to set up our development environment, which is a must-have skill for every programmer. Here is my local environment checklist for reference.

  • Operating system: MacOS Big Sur, 11.0.1.
  • Integrated Development Environment (IDE): IntelliJ IDEA 2020.3, but you can also choose Eclipse.
  • Technology stack: SpringBoot 2.1.12.RELEASE + JDK 1.8.0_221 + Netty 4.1.42.Final.
  • Project dependency management tool: Maven 3.5.4. You can either install Maven independently or use the integrated version of IDEA. If you install Maven independently, you need to configure the MAVEN_HOME and PATH environment variables.
  • Registry center: Zookeeper 3.4.14. Please note that Zookeeper and Apache Curator must be used together. Zookeeper 3.4.x version is required and Apache Curator only supports version 2.x.x.

Project Structure #

Before starting the development, we need to have a clear idea of the project structure. According to the RPC framework design architecture introduced in the previous lesson, we can divide the project structure into several modules.

Lark20210106-113815.png

What roles does each module play? Let’s introduce them one by one.

  • rpc-provider: the service provider. Responsible for publishing RPC services and receiving and processing RPC requests.
  • rpc-consumer: the service consumer. Initiates RPC remote calls using dynamic proxy, helping the user to abstract the details of underlying network communication.
  • rpc-registry: the registry module. Provides basic functions such as service registration, service discovery, and load balancing.
  • rpc-protocol: the network communication module. Includes the encoder and decoder of RPC protocol, serialization and deserialization tools, etc.
  • rpc-core: the basic library. Provides common utility classes and model definitions, such as RPC request and response classes, RPC service metadata classes, etc.
  • rpc-facade: the RPC service interface. Contains interfaces that need to be exposed by the service provider. This module is mainly used for testing RPC calls that simulate real scenarios.

As shown in the following figure, we first need to understand the dependencies between the modules in order to better organize the Maven pom definitions. Rpc-core is the most basic library, so most modules depend on it. Rpc-consumer is used to make RPC calls. Rpc-provider is responsible for handling RPC requests. If you don’t know the address of the remote service, then everything is just talk. Therefore, both modules need to depend on the service discovery and registration capabilities provided by rpc-registry.

Lark20210106-113819.png

How to Use #

Instead of rushing to start implementing the code details, let’s consider a question first. How should the final RPC framework be used by users? This is similar to learning a new technology. You can’t immediately delve into the details of the source code. Instead, you should first familiarize yourself with its basic usage, then find the key entry point and study the implementation principles in depth. This approach will achieve the best results for your efforts.

First, let’s look at the desired effect of the RPC framework, as shown below:

// rpc-facade # HelloFacade

public interface HelloFacade {

    String hello(String name);

}

// rpc-provider # HelloFacadeImpl

@RpcService(serviceInterface = HelloFacade.class, serviceVersion = "1.0.0")

public class HelloFacadeImpl implements HelloFacade {

    @Override

    public String hello(String name) {

        return "hello" + name;

    }

}

// rpc-consumer # HelloController

@RestController

public class HelloController {

    @RpcReference(serviceVersion = "1.0.0", timeout = 3000)

    private HelloFacade helloFacade;

    @RequestMapping(value = "/hello", method = RequestMethod.GET)

    public String sayHello() {

        return helloFacade.hello("mini rpc");

    }

}

To facilitate local simulation of the client and server, I will make rpc-provider and rpc-consumer modules start independently. rpc-provider exposes the RPC service HelloFacade using the @RpcService annotation, and rpc-consumer refers to the HelloFacade service and initiates the RPC call using the @RpcReference annotation. This is basically consistent with the usage of commonly used RPC frameworks.

After understanding the project structure and overall implementation ideas, let’s start development from the service provider.

Publishing Services by Service Provider #

What does the service provider rpc-provider need to accomplish? It mainly involves four core processes:

  • The service provider starts the service and exposes the service port.
  • When starting up, the provider scans the services that need to be published and publishes the service metadata information to the registry.
  • Receive the RPC request, decode it to obtain the request message.
  • Submit the request to a custom thread pool for processing and write the processing result back to the client.

In this lesson, we will first implement the first two processes for the rpc-provider module.

Starting the Service Provider #

The configuration for starting the service provider is basically a fixed pattern, starting with the bootstrap Bootstrap. You can review the basic course “03 The Role of Bootstrap: What Do Both the Client and Server Do When Starting?”. First, let’s take a look at the implementation of the service provider’s startup, as shown in the following code:

private void startRpcServer() throws Exception {

    this.serverAddress = InetAddress.getLocalHost().getHostAddress();

    EventLoopGroup boss = new NioEventLoopGroup();

    EventLoopGroup worker = new NioEventLoopGroup();

    try {

        ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(boss, worker)

                .channel(NioServerSocketChannel.class)

                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
        @Resource
        private RpcProperties rpcProperties;

        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
            // 如果 Bean 是被 @RpcService 注解修饰的,就将其注册到注册中心
            if (bean.getClass().isAnnotationPresent(RpcService.class)) {
                RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
                String serviceName = rpcService.serviceInterface().getName();
                String serviceVersion = rpcService.serviceVersion();
                String serviceAddr = rpcProperties.getServiceAddr() + ":" + rpcProperties.getServicePort();
                serviceRegistry.register(serviceName, serviceVersion, serviceAddr);
            }
            return bean;
        }

// Omitted other code

private final Map<String, Object> rpcServiceMap = new HashMap<>();

@Override

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);

if (rpcService != null) {

    String serviceName = rpcService.serviceInterface().getName();

    String serviceVersion = rpcService.serviceVersion();

    try {

        ServiceMeta serviceMeta = new ServiceMeta();

        serviceMeta.setServiceAddr(serverAddress);

        serviceMeta.setServicePort(serverPort);

        serviceMeta.setServiceName(serviceName);

        serviceMeta.setServiceVersion(serviceVersion);

        // TODO Publish service metadata to registry center

        rpcServiceMap.put(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()), bean);

    } catch (Exception e) {

        log.error("failed to register service {}#{}", serviceName, serviceVersion, e);

    }

}

return bean;

}

}

RpcProvider overrides the postProcessAfterInitialization method of the BeanPostProcessor interface to scan all beans after initialization. If a bean includes the @RpcService annotation, the metadata information of the service is read through the annotation and a ServiceMeta object is constructed. Next, the service metadata is prepared to be published to the registry center. We will skip the implementation of the registry center for now, and it will be covered in a separate lesson. In addition, RpcProvider maintains a rpcServiceMap to store the beans corresponding to the services after initialization. rpcServiceMap plays the role of a cache, and it can be used to directly retrieve the corresponding service for RPC invocation.

After understanding how the service provider handles the @RpcService annotation, it becomes much easier to implement the service consumer.

Service Consumer Subscribes to Services #

Unlike the service provider, the service consumer is not a resident service. It only chooses which remote service to send data to when making an RPC call. Therefore, the implementation of the service consumer is more complex. For the member variables declared with the @RpcReference annotation, we need to construct a bean that can actually make RPC calls and then register it with the Spring container.

First, let’s look at the definition of the @RpcReference annotation:

@Retention(RetentionPolicy.RUNTIME)

@Target(ElementType.FIELD)

@Autowired

public @interface RpcReference {

    String serviceVersion() default "1.0";

    String registryType() default "ZOOKEEPER";

    String registryAddress() default "127.0.0.1:2181";

    long timeout() default 5000;

}

The @RpcReference annotation provides four attributes: serviceVersion, registryType, registryAddress, and timeout. Next, we need to use these attributes to construct a custom bean and intercept all methods executed by the bean.

Spring’s FactoryBean interface can help us implement custom beans. FactoryBean is a special type of factory bean that returns an object through the getObject() method, rather than the FactoryBean itself.

public class RpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;

    private String serviceVersion;

    private String registryType;

    private String registryAddr;

    private long timeout;

    private Object object;

    @Override

    public Object getObject() throws Exception {

        return object;

    }

    @Override

    public Class<?> getObjectType() {

        return interfaceClass;

    }

    public void init() throws Exception {

        // TODO Generate dynamic proxy object and assign it to object

    }

    public void setInterfaceClass(Class<?> interfaceClass) {

        this.interfaceClass = interfaceClass;

    }

    public void setServiceVersion(String serviceVersion) {

        this.serviceVersion = serviceVersion;

    }

    public void setRegistryType(String registryType) {

        this.registryType = registryType;
}
public void setRegistryAddr(String registryAddr) {

    this.registryAddr = registryAddr;

}

public void setTimeout(long timeout) {

    this.timeout = timeout;

}
}

In the RpcReferenceBean class, the init() method is marked with TODO. This part needs to implement the dynamic proxy object and complete the RPC call through the proxy object. For the user, subscribing to the service using @RpcReference does not need to know the details of the underlying invocation. The implementation of RPC communication and service addressing is done in the dynamic proxy class, which will be explained in detail in the following section.

With the @RpcReference annotation and RpcReferenceBean, we can use Spring’s extension point BeanFactoryPostProcessor to modify the bean definition. Both BeanFactoryPostProcessor and BeanPostProcessor are core extension points of Spring. What is the difference between them? BeanFactoryPostProcessor is executed after the Spring container loads the bean definition and before the bean is instantiated. Therefore, BeanFactoryPostProcessor can obtain the configuration metadata of the bean before it is instantiated and allow users to modify it. On the other hand, BeanPostProcessor is executed before and after the initialization of the bean, and it cannot modify the configuration information of the bean.

Now we need to construct RpcReferenceBean for the member variables declared with the @RpcReference annotation. Therefore, we need to implement BeanFactoryPostProcessor to modify the bean definition. The specific implementation is as follows:

@Component
@Slf4j
public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {

    private ApplicationContext context;
    private ClassLoader classLoader;
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions = new LinkedHashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);
                ReflectionUtils.doWithFields(clazz, this::parseRpcReference);
            }
        }

        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) -> {
            if (context.containsBean(beanName)) {
                throw new IllegalArgumentException("spring context already has a bean named " + beanName);
            }
            registry.registerBeanDefinition(beanName, rpcRefBeanDefinitions.get(beanName));
            log.info("registered RpcReferenceBean {} success.", beanName);
        });
    }

    private void parseRpcReference(Field field) {
        RpcReference annotation = AnnotationUtils.getAnnotation(field, RpcReference.class);
        if (annotation != null) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);
            builder.setInitMethodName(RpcConstants.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass", field.getType());
            builder.addPropertyValue("serviceVersion", annotation.serviceVersion());
            builder.addPropertyValue("registryType", annotation.registryType());
            builder.addPropertyValue("registryAddr", annotation.registryAddress());
            builder.addPropertyValue("timeout", annotation.timeout());
            BeanDefinition beanDefinition = builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(), beanDefinition);
        }
    }
}

In the RpcConsumerPostProcessor class, the postProcessBeanFactory method is overridden from BeanFactoryPostProcessor. It retrieves all the bean definition information from the beanFactory and then checks each field of each bean for the annotation @RpcReference. If a field is annotated with @RpcReference, it constructs the definition of RpcReferenceBean using BeanDefinitionBuilder and sets the values of interfaceClass, serviceVersion, registryType, registryAddr, and timeout. After constructing the definition of RpcReferenceBean, it re-registers the bean definition of RpcReferenceBean to the Spring container.

So far, we have built the basic framework for the service provider and service consumer, and we have introduced in detail how the service provider uses the @RpcService annotation to publish services. The service consumer needs an annotation @RpcReference to inject the service interface. Any member variables annotated with @RpcReference will be constructed into RpcReferenceBean, and a dynamic proxy class will be generated for it. We will continue to explore in depth later.

Summary #

In this lesson, we introduced the implementation principle of service publishing and subscription and built the basic framework for the service provider and service consumer. It can be seen that if we use Java to implement the core logic of RPC framework’s service publishing and subscription, we need to have a solid foundation in the Spring framework. Understanding important extension interfaces of Spring can help us develop more elegant code.

Two homework assignments:

  1. In this lesson, I left several TODOs. Can you start from these TODOs and think about the overall framework of the RPC framework?
  2. Review the fundamental knowledge of Netty’s custom handlers, ChannelHandler, and encoding/decoding. In the next lesson, we will complete the network communication part of the RPC framework.