24 Starting From Protocol Overview of Service Exposure and Service Invocation Full Process Above

24 Starting From Protocol Overview of Service Exposure and Service Invocation Full Process Above #

In the previous lesson, we explained the core interface of Protocol. In this lesson, we will focus on the core implementation of Protocol using Protocol interface as the core. The following diagram shows the inheritance relationship of the Protocol interface:

Drawing 0.png

Protocol interface inheritance diagram

Among them, the AbstractProtocol provides some common capabilities and fields that Protocol implementations need. It has three core fields:

  • exporterMap (Map<String, Exporter<?>> type): Used to store the exported services. The key in the map is the service identifier created by the ProtocolUtils.serviceKey() method. ProtocolUtils maintains a multi-level map structure (as shown in the following figure). First, it is grouped by group. In practice, we can set the group according to our needs, such as grouping by data center or region for closer invocation. In GroupServiceKeyCache, it is classified according to serviceName, serviceVersion, and port, and the cached serviceKey is formed by concatenating the three.

Lark20201016-164613.png

groupServiceKeyCacheMap structure diagram

  • serverMap (Map type): Records all instances of ProtocolServer. The key is a string composed of host and port, and the value is the ProtocolServer that listens to that address. ProtocolServer is a simple encapsulation of RemotingServer and represents a server.
  • invokers (Set<Invoker<?>> type): A collection of service references.

AbstractProtocol does not implement the export() method of Protocol. The implementation of the refer() method is delegated to the abstract method protocolBindingRefer() and then implemented by the subclass. The only method implemented by AbstractProtocol is the destory() method. It first traverses the Invokers collection to destroy all service references, and then traverses the exporterMap collection to destroy the published services. The specific implementation is as follows:

public void destroy() {

    for (Invoker<?> invoker : invokers) {

        if (invoker != null) {

            invokers.remove(invoker);

            invoker.destroy(); // Close all service references

        }

    }

    for (String key : new ArrayList<String>(exporterMap.keySet())) {

        Exporter<?> exporter = exporterMap.remove(key);

        if (exporter != null) {

            exporter.unexport(); // Close the exposed services

        }

    }

}

Export process overview #

After understanding the common capabilities provided by AbstractProtocol, let’s analyze the implementation of DubboProtocol, the Protocol implementation class used by Dubbo by default. Here we first focus on the export() method of DubboProtocol, which is the implementation related to service publication, as shown below:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

    URL url = invoker.getUrl();

    // Create ServiceKey, the core implementation has been analyzed in detail in the previous text, and will not be repeated here

    String key = serviceKey(url); 

    // Wrap the Invoker object passed in by the upper layer into DubboExporter, and then record it in the exporterMap collection

    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);

    exporterMap.put(key, exporter);

    ... // Omit some log operations

    // Start ProtocolServer

    openServer(url);

    // Perform optimization processing for serialization

    optimizeSerialization(url);

    return exporter;

}

1. DubboExporter #

The first point involved here is the encapsulation of Invoker by DubboExporter. The inheritance relationship of DubboExporter is shown in the following figure:

Drawing 2.png

DubboExporter inheritance diagram

AbstractExporter maintains an Invoker object and an unexported field (boolean type). In the unexport() method, the unexported field is set to true, and the destroy() method of the Invoker object is called to destroy it.

DubboExporter is also relatively simple. It maintains the ServiceKey corresponding to the underlying Invoker and the exportMap collection in DubboProtocol. In its unexport() method, in addition to calling the unexport() method of the parent class AbstractExporter, it also cleans up the corresponding elements of this DubboExporter instance in the exportMap.

2. Server initialization #

After understanding the implementation of Exporter, let’s continue to look at the process of service publication in DubboProtocol. From the following call stack diagram, we can see that the openServer() method will call the Exchange layer, Transport layer all the way, and finally create a NettyServer to receive client requests.

Drawing 3.png

Call stack of the export() method

Next, we will introduce each method called in the export() method stack one by one.

First, in the openServer() method, it will determine whether it is a server based on the URL. Only the server can create a ProtocolServer and provide services externally. If the call comes from the server side, it will check whether there is a ProtocolServer listening to the address specified by the URL using the serverMap collection. If not, it will call the createServer() method to create one. The specific implementation of the openServer() method is as follows:

private void openServer(URL url) {

    String key = url.getAddress(); // Get the address in the format of host:port

    boolean isServer = url.getParameter(IS_SERVER_KEY, true);

    if (isServer) { // Only the server can start the server object

        ProtocolServer server = serverMap.get(key);

        if (server == null) { // No ProtocolServer is listening to this address

            synchronized (this) { // DoubleCheck, to prevent concurrency issues

                server = serverMap.get(key);
if (server == null) {

    // Call createServer() method to create ProtocolServer object

    serverMap.put(key, createServer(url));

}

}

else {

    // If there is already a ProtocolServer instance, try to reset ProtocolServer based on URL information

    server.reset(url);

}

}




createServer() method first adds some default values for the URL and performs some parameter value checks, mainly five.

* The value of HEARTBEAT_KEY parameter, default value is 60000, which means the default heartbeat interval is 60 seconds.
* The value of CHANNEL_READONLYEVENT_SENT_KEY parameter, default value is true, which means that ReadOnly requests need to block and wait for a response to return. When the server is closed, only ReadOnly requests can be sent. This CHANNEL_READONLYEVENT_SENT_KEY parameter determines whether the responses need to wait for a return.
* The value of CODEC_KEY parameter, default value is dubbo. You can review the parameters of @Adaptive annotation in Codec2 interface, which all retrieve the value of CODEC_KEY parameter from this URL.
* Check whether the specified extension implementation name of the SERVER_KEY parameter is legal, default value is netty. You can review the parameters of @Adaptive annotation in Transporter interface, which determines the network library used by the Transport layer, and the default implementation is Netty 4.
* Check whether the specified extension implementation name of the CLIENT_KEY parameter is legal. The process of checking is the same as the SERVER_KEY parameter.


After setting the default parameter values above, we can create an ExchangeServer through the Exchangers facade class and wrap it into a DubboProtocolServer.


private ProtocolServer createServer(URL url) {

    url = URLBuilder.from(url)

            // Whether the ReadOnly request needs to block and wait

            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())

            // Heartbeat interval

            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))

            .addParameter(CODEC_KEY, DubboCodec.NAME) // Codec2 extension implementation

            .build();

    // Check whether the extension implementation of the Transporter specified by the SERVER_KEY parameter is legal

    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); 

    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {

        throw new RpcException("...");

    }

    // Create ExchangeServer through the Exchangers facade class

    ExchangeServer server = Exchangers.bind(url, requestHandler);

    ... // Check whether the extension implementation of the Transporter specified by the CLIENT_KEY parameter is legal (skipped)

    // Wrap the ExchangeServer into DubboProtocolServer and return

    return new DubboProtocolServer(server);

}


There are several details in the createServer() method that need to be analyzed. The first one is when creating ExchangeServer, the implementation of the Codec2 interface used is actually DubboCountCodec, the corresponding SPI configuration file is as follows:

![Drawing 4.png](../images/CgqCHl-FTK-AUlLCAADTWhhySe8432.png)

Codec2 SPI configuration file

DubboCountCodec maintains a DubboCodec object, and the encoding and decoding capabilities are provided by DubboCodec. DubboCountCodec is only responsible for controlling the readerIndex pointer of the ChannelBuffer during the decoding process, the specific implementation is as follows:


public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {

    int save = buffer.readerIndex(); // First, save the readerIndex pointer

    // Create a MultiMessage object that can store multiple messages

    MultiMessage result = MultiMessage.create(); 

    do {

        // Decode a message using the decoding capability provided by DubboCodec

        Object obj = codec.decode(channel, buffer);

        // If the number of readable bytes is not enough for a message, the readerIndex pointer will be reset

        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {

            buffer.readerIndex(save);

            break;

        } else { // Add the successfully decoded message to MultiMessage for temporary storage

            result.addMessage(obj);

            logMessageLength(obj, buffer.readerIndex() - save);

            save = buffer.readerIndex();

        }

    } while (true);

    if (result.isEmpty()) { // If no messages are decoded, return NEED_MORE_INPUT error code

        return Codec2.DecodeResult.NEED_MORE_INPUT;

    }

    if (result.size() == 1) { // If only one message is decoded, return that message directly

        return result.get(0);

    }

// If multiple messages are decoded, MultiMessage will be returned

return result; }

Both DubboCountCodec and DubboCodec implement the Codec2 interface introduced in Lesson 22. DubboCodec is a subclass of ExchangeCodec.

Drawing 5.png

Inheritance diagram of DubboCountCodec and DubboCodec

We know that ExchangeCodec only handles the request header of the Dubbo protocol, while DubboCodec adds the functionality of parsing the Dubbo message body on top of ExchangeCodec through inheritance. When we introduced the implementation of ExchangeCodec in Lesson 22, we focused on the encodeRequest() method, which is responsible for encoding the Request. In this method, the encodeRequestData() method is called to encode the request body.

DubboCodec overrides the encodeRequestData() method to encode the Request body in the format of the Dubbo protocol. The specific implementation is as follows:

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {

    // The content related to the request body is encapsulated in RpcInvocation

    RpcInvocation inv = (RpcInvocation) data; 

    out.writeUTF(version); // Write the version number

    String serviceName = inv.getAttachment(INTERFACE_KEY);

    if (serviceName == null) {

        serviceName = inv.getAttachment(PATH_KEY);

    }

    // Write the service name

    out.writeUTF(serviceName);

    // Write the service version

    out.writeUTF(inv.getAttachment(VERSION_KEY));

    // Write the method name

    out.writeUTF(inv.getMethodName());

    // Write the parameter type list

    out.writeUTF(inv.getParameterTypesDesc());

    // Write all parameters one by one

    Object[] args = inv.getArguments();

    if (args != null) {

        for (int i = 0; i < args.length; i++) {

            out.writeObject(encodeInvocationArgument(channel, inv, i));

        }

    }

    // Write all attachments

    out.writeAttachments(inv.getObjectAttachments());

}

RpcInvocation implements the Invocation interface introduced in the previous lesson, as shown in the following diagram:

Drawing 6.png

Inheritance diagram of RpcInvocation

The following are the core fields in RpcInvocation, and reading and writing these fields can implement all the methods of the Invocation interface.

  • targetServiceUniqueName (String type): The unique service name to be called, which is actually the ServiceKey, a string composed of interface/group:version.
  • methodName (String type): The name of the target method to be called.
  • serviceName (String type): The name of the target service to be called, in this example, it is org.apache.dubbo.demo.DemoService.
  • parameterTypes (Class[] type): Records all the parameter types of the target method.
  • parameterTypesDesc (String type): Signature of the parameter list.
  • arguments (Object[] type): Specific parameter values.
  • attachments (Map type): Additional information for this invocation that can be serialized into the request.
  • attributes (Map type): Attribute information for this invocation that cannot be sent out.
  • invoker (Invoker type): Invoker object associated with this invocation.
  • returnType (Class type): Type of the return value.
  • invokeMode (InvokeMode type): Mode of this invocation, divided into SYNC, ASYNC, and FUTURE.

In the inheritance diagram above, we can see a subclass of RpcInvocation called DecodeableRpcInvocation, which is used for decoding. The decode() method implemented by DecodeableRpcInvocation is exactly the decoding operation corresponding to the DubboCodec.encodeRequestData() method. This method is called in the DubboCodec.decodeBody() method, and the call stack is shown in the following diagram:

Drawing 7.png

Call stack of decode() method

There is a detail in the decoding process. In the DubboCodec.decodeBody() method, there is the following code snippet, which determines whether to decode in the DubboCodec based on the DECODE_IN_IO_THREAD_KEY parameter (DubboCodec is called in the IO thread).

// decode request.

Request req = new Request(id);

... // Omitted the setting of other fields in Request

Object data;

DecodeableRpcInvocation inv;

// This checks the DECODE_IN_IO_THREAD_KEY parameter

if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {

    inv = new DecodeableRpcInvocation(channel, req, is, proto);

    inv.decode(); // Call decode() method directly to decode in the current IO thread

} else { // Only read data here, no decoding is performed in the current IO thread

    inv = new DecodeableRpcInvocation(channel, req,

            new UnsafeByteArrayInputStream(readMessageData(is)), proto);

}

data = inv;
req.setData(data); // set data field to Request object

return req;

If decoding is not done in DubboCodec, where will it be decoded? You can review the DecodeHandler introduced in Lesson 20. Its received() method can also perform decoding. In addition, DecodeableRpcInvocation has a field called hasDecoded to determine whether decoding has been completed. With these three working together, the thread for executing the decoding operation can be determined based on the DECODE_IN_IO_THREAD_KEY parameter.

If you are not clear about the thread model, you can review the bind() methods of the three facade classes: Exchangers, HeaderExchanger, and Transporters, as well as the thread models provided by the various implementations of Dispatcher. Understand which ChannelHandler is executed by which thread. These topics have been introduced in previous lessons and will not be repeated. Here we will directly give the conclusion using the implementation of AllDispatcher as an example.

  • The ChannelHandler implementations executed in the IO thread are: InternalEncoder, InternalDecoder (both of which ultimately call DubboCodec), IdleStateHandler, MultiMessageHandler, HeartbeatHandler, and NettyServerHandler.
  • The ChannelHandler implementations executed in non-IO threads are: DecodeHandler, HeaderExchangeHandler, and DubboProtocol$requestHandler.

In DubboProtocol, there is a requestHandler field, which is an instance of an anonymous inner class implementing the ExchangeHandlerAdapter abstract class, indirectly implementing the ExchangeHandler interface. Its core is the reply() method, implemented as follows:

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    ... // This logic for checking the message type is omitted here, based on the previous Handler processing, the message received here must be an Invocation object

    Invocation inv = (Invocation) message;

    // Get the Invoker object for this call

    Invoker<?> invoker = getInvoker(channel, inv);

    ... // Details about the client callback will be explained later, not discussed here

    // Record the client's address in RpcContext

    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

    // Execute the actual call

    Result result = invoker.invoke(inv);

    // Return the result

    return result.thenApply(Function.identity());

}

In the above code, the getInvoker() method first constructs the ServiceKey based on the information carried in the Invocation, and then searches for the corresponding DubboExporter object from the exporterMap collection and returns the underlying Invoker object. Here is the specific implementation:

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {

    ... // Omitted the logic for handling client Callbacks and stubs, which will be introduced separately later

    String serviceKey = serviceKey(port, path, (String) inv.getObjectAttachments().get(VERSION_KEY),
                                 (String) inv.getObjectAttachments().get(GROUP_KEY));

    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    ... // If the corresponding DubboExporter object cannot be found, an exception will be thrown directly. This check is omitted here

    return exporter.getInvoker(); // Get the Invoker object from the exporter

}

So far, we finally see the invocation of the Invoker object. The introduction and analysis of the Invoker implementation will be covered in future lessons. Here, we will focus on the related content of DubboProtocol.

3. Serialization Optimization #

Now let’s go back to the DubboProtocol.export() method for further analysis. After the ProtocolServer is started, the export() method will finally call the optimizeSerialization() method to optimize the specified serialization algorithm.

Here’s some background information: When using certain serialization algorithms (such as Kryo, FST, etc.), in order to achieve optimal performance, it is best to register the classes that need to be serialized in advance in the Dubbo system. For example, we can use an optimizer that implements the SerializationOptimizer interface and specify it in the configuration, as shown in the example code below:

public class SerializationOptimizerImpl implements SerializationOptimizer {

    public Collection<Class> getSerializableClasses() {

        List<Class> classes = new ArrayList<>();

        classes.add(xxxx.class); // Add the classes that need to be serialized

        return classes;

    }

}

In the optimizeSerialization() method of DubboProtocol, the optimizer class, which implements the SerializationOptimizer interface, is obtained based on the value of the optimizer parameter in the URL. The relevant classes are then registered by the underlying serialization algorithm, resulting in significantly improved serialization performance. When performing serialization, you will inevitably cascade into many internal classes in Java (such as arrays, various types of collections, etc.). Kryo, FST, and other serialization algorithms have automatically registered commonly used classes from the JDK, so there is no need to register them again.

Now let’s take a look at the optimizeSerialization() method to understand the specific implementation details of serialization optimization:

private void optimizeSerialization(URL url) throws RpcException {

    // Determine the class that implements the SerializationOptimizer interface based on the value of the optimizer parameter in the URL

    String className = url.getParameter(OPTIMIZER_KEY, "");

    Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);

    // Create an instance of the SerializationOptimizer implementation class

    SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();

    // Call the getSerializableClasses() method to get the classes that need to be registered

    for (Class c : optimizer.getSerializableClasses()) {

        SerializableClassRegistry.registerClass(c); 

    }

    optimizers.add(className);

}

SerializableClassRegistry maintains a static Map (REGISTRATIONS field), and the registerClass() method temporarily stores the classes to be optimized in this collection. When using Kryo, FST, and other serialization algorithms, the classes in this collection will be accessed to complete the registration operation. The following diagram shows the relevant call relationship:

getRegisteredClasses() method call location

According to the official Dubbo documentation, even without registering any classes for optimization, Kryo and FST generally outperform Hessian2 and Dubbo serialization.

Summary #

In this lesson, we focused on the core process of publishing a Dubbo service through the DubboProtocol. First, we introduced the common capabilities and fields that the AbstractProtocol abstract class provides to Protocol implementations. Then, taking the implementation of DubboProtocol corresponding to the Dubbo protocol as an example, we explained the core process of publishing a Dubbo service, which involves the entire server’s core startup process, RpcInvocation implementation, the invocation of the DubboProtocol.requestHandler field on the Invoker object, and serialization-related optimization.