29 Learning Netty Network Programming Techniques From Rocket Mq

29 Learning Netty Network Programming Techniques from RocketMQ #

Network Design from the Whole Class System #

The core class diagram of RocketMQ for networking is as follows:

1

Next, let’s introduce the main responsibilities of each class one by one.

RemotingService #

RPC remote service base class. Mainly defines the basic methods for all remote service classes:

  • void start(): Start the remote service.
  • void shutdown(): Close.
  • void registerRPCHook(RPCHook rpcHook): Register an RPC hook function to execute customized logic before and after network operations.

RemotingServer/RemotingClient #

Remote server/client base interface. The methods of both are basically the same, so here we focus on introducing RemotingServer, which is the “business method” related to RPC remote operations.

void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor)

Register the command processor. This is the core highlight of RocketMQ’s Netty network design. RocketMQ will split the business logic according to different network operations, such as message sending, message pulling, etc. Each network operation will define a request code (requestCode), and each type corresponds to a business processor NettyRequestProcessor. Different thread pools can be defined according to different request codes to achieve thread pool isolation for different requests. The parameter description is as follows.

int requestCode

Command code, all request commands in RocketMQ are defined in RequestCode.

NettyRequestProcessor processor

RocketMQ request business processor, for example, SendMessageProcessor is the processor for message sending, and PullMessageProcessor is the processor for message pulling.

ExecutorService executor

Thread pool, the specific business logic of NettyRequestProcessor is executed in this thread pool.

Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode)

Get the corresponding request business processor and thread pool based on the request code.

RemotingCommand invokeSync(Channel channel, RemotingCommand request, long timeoutMillis)

Synchronous request invocation, the parameter description is as follows:

  • Channel channel: Netty network channel.

  • RemotingCommand request: RPC request message, that is, each request will be encapsulated into this object.

  • long timeoutMillis: Timeout.

    void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)

Asynchronous request invocation.

void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis)

Oneway request invocation.

NettyRemotingAbstract #

Netty is a remote service abstract implementation class that defines the logic for handling network remote calls, requests, and responses. The core methods and design principles are as follows.

NettyRemotingAbstract Core Properties:

  • Semaphore semaphoreOneway: Controls the concurrency of oneway send mode with a semaphore, with a default of 65535 permits.
  • Semaphore semaphoreAsync: Controls the concurrency of asynchronous send mode with a semaphore, with a default of 65535 permits.
  • ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable: Table of requests currently waiting for the remote end to respond. The opaque parameter represents the request number, which is unique globally and usually increments atomically. Typically, when a client sends a network request to the remote end using a single long connection, it will immediately return ResponseFuture to the caller and put the request in this mapping table. When a client response is received (including the request code), the corresponding ResponseFuture is obtained from this mapping table and the result is returned to the caller. This is a classic application of the Future pattern in network programming.
  • HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable: Registered request processing commands. RocketMQ uses different request commands to support different thread pools, achieving isolation of business thread pools.
  • Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor: Default command processing thread pool.
  • List<RPCHook> rpcHooks: List of registered RPC hooks.

NettyRemotingClient #

It is a Netty-based network programming client that implements the RemotingClient interface and extends NettyRemotingAbstract.

The core property is explained as follows:

  • NettyClientConfig nettyClientConfig: Configuration related to the network.
  • Bootstrap bootstrap: Netty client startup helper class.
  • EventLoopGroup eventLoopGroupWorker: Netty client Work thread group, commonly known as IO thread.
  • ConcurrentMap<String /* addr */, ChannelWrapper> channelTables: Connections (network channels, Netty channels) already created by the current client, with one long connection for each address.
  • ExecutorService publicExecutor: Default task thread pool.
  • ExecutorService callbackExecutor: Thread pool for executing callback class requests.
  • DefaultEventExecutorGroup defaultEventExecutorGroup: Netty ChannelHandler thread execution group, i.e. Netty ChannelHandler executes in these threads.

NettyRemotingServer #

It is a Netty-based network programming server.

The core properties are as follows:

  • ServerBootstrap serverBootstrap: Netty Server startup helper class.
  • EventLoopGroup eventLoopGroupSelector: Netty Server Work thread group, which is the sub-Reactor in the master-slave multiple reactor design pattern, responsible for handling read and write events.
  • EventLoopGroup eventLoopGroupBoss: Netty Boss thread group, which is the master reactor in the master-slave reactor thread model, mainly responsible for OP_ACCEPT events (creating connections).
  • NettyServerConfig nettyServerConfig: Netty server configuration.
  • Timer timer = new Timer("ServerHouseKeepingService", true): Timer scanner scans the responseTable in NettyRemotingAbstract on a regular basis, removing timeout requests.
  • DefaultEventExecutorGroup defaultEventExecutorGroup: Netty ChannelHandler thread execution group.
  • int port: Server binding port.
  • NettyEncoder encoder: RocketMQ communication protocol (encoder).
  • NettyDecoder decoder: RocketMQ communication protocol (decoder).
  • NettyConnectManageHandler connectionManageHandler: Netty connection manager handler, mainly responsible for tracking connection status.
  • NettyServerHandler serverHandler: NettyServer core business processor.

NettyRequestProcessor #

It is a request command processor implemented based on Netty, which represents various business processing logic on the server, such as SendMessageProcessor for message sending.

NettyRemotingServer and NettyRemotingClient will be further discussed in the following sections. Understanding the essence of Netty network design is not intuitive enough through a class diagram, so the following flow chart is provided to further illustrate the essence of RocketMQ network design.

2

The key points are explained as follows:

The initialization process for NettyRemotingClient and NettyRemotingServer will be omitted in the flowchart, as they will be discussed in detail later.

  • NettyRemotingClient will create a Channel using Netty-related APIs and cache it when it needs to connect to a specified address. The connection can be reused for subsequent requests sent to the same address.
  • Then, NettyRemotingClient invokes methods like invokeAsync to perform network sending. During sending, an important step in Netty is performed: request encoding, which converts the request, such as RemotingCommand, into binary data according to a specific format (protocol).
  • When the NettyRemotingServer receives the binary data, the network read request is ready and the read request event processing process begins. Firstly, a complete request packet needs to be identified from the binary data. This is called decoding, which converts the binary data into a request object (RemotingCommand). Then the read event is propagated to NettyServerHandler, and finally NettyRemotingAbstract’s processRequestCommand is executed. This mainly involves retrieving the specified command execution thread pool and NettyRequestProcessor based on the requestCode, and executing the corresponding logic. The execution result is then returned to the client via the network.
  • When the client receives the server’s response, the read event is triggered, the decoding process (NettyDecoder) is performed, and then the read event is propagated to NettyClientHandler, which handles the response.

Key Points of Netty Network Programming #

After understanding the basic process of network programming, let’s learn the specific implementation code of NettyRemotingServer and NettyRemotingClient to grasp the writing techniques for Netty server and client.

Based on the network programming model, the following problems typically need to be solved:

  • Establishing network connections
  • Designing communication protocols
  • Thread models

Network-based Programming #

Network-based programming is essentially based on binary streams. Let’s take the familiar Dubbo RPC access request as an example to explain it more intuitively. The communication process of Dubbo is as follows:

3

For example, in an order service called order-service-app, users will initiate multiple order services, which corresponds to multiple threads in the order-service-app. The order service needs to call the coupon-related microservice. Multiple threads initiate RPC calls to the coupon service using Dubbo clients. What operations are required in this process?

  1. Create a TCP connection. By default, the Dubbo client and Dubbo server will keep a long-lived connection and use this connection to send all network requests from the client to the server.
  2. Convert the request into binary stream. Imagine that multiple requests are sent one after another through a single connection. How does the server parse a complete request from the binary stream? For example, in a Dubbo request, the request body needs to encapsulate the remote service name and request parameters. This involves the so-called “custom protocol”, which requires specifying a “communication specification”.
  3. The client converts the request into binary stream according to the communication protocol, which is called “encoding”. The server identifies individual requests from the binary stream according to the communication protocol, which is called “decoding”.
  4. After the server decodes the request, it needs to execute the corresponding business logic. In network communication, there are usually two types of threads involved: IO threads and business thread pools. Usually, IO threads are responsible for request parsing, while business thread pools execute business logic, decoupling IO reading and writing from business logic as much as possible.

Next, let’s explore how RocketMQ uses Netty to learn and use.

Netty Client Programming Practice #

1. Client Creation Sample and Key Points

In RocketMQ, the implementation class of the client is NettyRemotingClient. The core code for client creation is encapsulated in the start method, as shown in the following screenshot:

4

The above code is basically a standard template for creating a client using Netty, and the key points are explained as follows.

Create DefaultEventExecutorGroup, which is the default event execution thread group. Subsequent event handlers (added in ChannelPipeline with addLast) will be executed in this thread group, which is essentially a thread pool.

Create a Netty client using the Bootstrap utility class provided by Netty. The group method specifies an event loop group (EventLoopGroup), which is the work thread group. It mainly encapsulates the event selector (java.nio.Selector). By default, read and write events are executed in this thread group, commonly known as IO threads. However, this default behavior can be changed, and this will be explained in detail later. The channel method specifies the type of channel, which is usually NioSocketChannel for NIO-based clients.

Set network communication parameters through the option method of Bootstrap. In most cases, the following parameters are specified:

  • TCP_NODELAY: Whether to disable Nagle algorithm. If set to true, the packets will be sent immediately. If set to false, if a packet is small, it will try to wait for more packets to be sent together.
  • SO_KEEPALIVE: Since I don’t have a deep understanding of networks, I suggest that you search for network-related knowledge on the Internet. Usually, we can refer to mainstream practices and set this value to false.
  • CONNECT_TIMEOUT_MILLIS: Connection timeout. If the client fails to establish a connection within this time, a timeout exception will be thrown, indicating a connection establishment failure.
  • SO_SNDBUF and SO_RCVBUF: The sizes of the socket send and receive buffers. In RocketMQ, the default value is set to 65535, which means 64kb by default.

Build the event handling chain through the handle method of Bootstrap. This is usually done by using new ChannelInitializer<SocketChannel>().

Build the event handling chain through the addLast method of ChannelPipeline. Here is the core extension point based on Netty, and the application’s business logic is intercepted by this event handler. The event handling chain in RocketMQ is explained as follows:

  • NettyEncoder: RocketMQ request encoder, i.e., protocol encoder.
  • NettyDecoder: RocketMQ request decoder, i.e., protocol decoder.
  • IdleStateHandler: Idle detection.
  • NettyConnectManageHandler: Connection manager.
  • NettyClientHandler: Netty client business handler, which handles “business logic”.

In summary, Netty-based programming mainly involves defining a communication protocol (encoding and decoding) and business processing. Each of them will be introduced below.

The key point of the addLast method of ChannelPipeline:

5

If no EventExecutorGroup is passed in when adding event handlers, the events will be executed in the work thread group by default. If specified, the events will be executed in the thread pool passed in.

2. Connection Creation and Key Points

The above initialization does not create a connection. In RocketMQ, a connection is created only when it is used, and once created, it can be reused and cached, which is commonly known as a “long-lived connection”. The example code for creating a connection based on Netty is as follows:

6

This is basically a template for creating a connection based on Netty. The implementation key points are as follows:

  • Use the connect method of Bootstrap to create a connection. This method returns immediately and does not block, and then adds the connection to channelTables for caching.
  • Since the connect method of Bootstrap only returns a Future when creating a connection, it usually needs to wait synchronously for the connection to be successfully established. Therefore, it is necessary to call the awaitUninterruptibly method of ChannelFuture and wait for the connection to be successfully established within the allowed timeout for connection establishment. After this method returns, the following code is needed to determine whether the connection has actually been successfully established:
public boolean isOK() {
    return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}

3. Request Sending Sample

Let’s take synchronous message sending as an example to see how message sending is used. The sample code is as follows:

7

Some key points of usage are as follows:

  • First, each request is assigned a unique number, called requestId. In this case, we use opaque as a convenience to represent it, which should be unique within a single machine.
  • Then, based on the Future pattern, create ResponseFuture and put it into ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable. When the client receives a response from the server, it needs to find the corresponding ResponseFuture based on opaque in order to wake up the client.
  • By using the writeAndFlush method of the CHannel, the request Request is sent to the server through the network. Internally, the NettyEncoder encoder is used to encode the RemotingCommand request into a binary stream. The addListener is used to add a callback function, and the processing is performed in the callback function to wake up the result of the processing.
  • The implementation of synchronous calls is to call the waitResponse method of Future, and this method is awakened when the response result is received.

Netty Server Programming Practices #

  1. Netty Server Creation Example

Step 1: Create the Boss and Work event thread groups. The Netty server thread model uses the master-slave Reactor model. It creates two thread groups, the Boss Group and the Work Group, as shown in the following diagram:

8

Usually, the Boss Group uses one thread by default, and the Work thread group usually consists of the prime numbers of the CPU. The Work thread group is usually an IO thread pool that handles read and write events.

Step 2: Create the default event execution thread group.

9

The purpose of this thread pool is similar to that of the client and will not be repeated.

Step 3: Use the Netty ServerBootstrap server startup class to build the server. (Template)

10

The key points of building with ServerBootstrap are as follows:

  • Specify the boss and work thread groups through the group method of ServerBootstrap.
  • Specify the channel type through the channel method of ServerBootstrap, usually NioServerSocketChannel or EpollServerSocketChannel.
  • Set the network parameters related to the EpollServerSocketChannel through the option method, which are the parameters related to the network channel that listens to client requests.
  • Set the network parameters related to NioSocketChannel through the childOption method, which are the network parameters related to Socket read and write operations.
  • Bind to the specified IP and port of the server through the localAddress method.
  • Set the actual processing listener through the childHandler method. It is the main entry point for application programming with Netty, similar to the client. Among them, ServerHandler is the business processing handler for the server. It is no different from the client in terms of encoding and decoding.

Step 4: Call the bind method of ServerBootstrap to bind to the specified port.

11

The bind method of ServerBootstrap is a non-blocking method. Calling the sync() method turns it into a blocking method, waiting for the server to start.

  1. Netty ServerHandler Writing Example

The server’s network communication mainly involves receiving requests and processing them, and then sending the response to the client. The entry point for processing requests usually involves defining a ChannelHandler. Let’s take a look at the Handler written in RocketMQ.

12

The business processing handler of the server mainly receives client requests, so it usually focuses on read events. It can usually inherit from SimpleChannelInboundHandler and implement the channelRead0 method. Since it has been decoded using the decoder (NettyDecoder), the request has been decoded into a specific request object. In RocketMQ, the RemotingCommand object is used, and programming can be done based on this object. The processMessageReceived method is the parent class of NettyRemotingClient and NettyRemotingServer, so the server will call the processReqeustCommand method.

In Netty4-based programming, adding @ChannelHandler.Sharable to the ChannelHandler can achieve thread safety.

Friendly reminder: Usually, specific business logic will not be executed in the ChannelHandler, usually only responsible for dispatching requests. Behind the scenes, a thread pool will be introduced for asynchronous decoupling. This is especially true in RocketMQ’s implementation. RocketMQ provides thread pool isolation based on “business”, for example, different thread pools are created for message sending and message pulling. This part will be elaborated in detail in the following.

Protocol Encoder and Decoder #

In network programming, the design of the communication protocol is the most important work. The usual routine for designing a communication protocol is as follows:

13

Usually, the structure is Header + Body. The Header part is usually of a fixed length, and there will be a field in the Header part to indicate the length of the entire message. As for whether other fields will be placed in the header, it depends. This structure is very classic, simple to implement, and particularly suitable for decoding requests from binary streams. The key points are as follows:

  • The receiver will first attempt to read Header length bytes from the binary stream. If the currently available bytes are not enough to read the Header length bytes, it will accumulate and wait for more data to arrive.
  • If the Header length bytes can be read, the total length of the message is read according to the format specified in the Header. Then it tries to read the message with the total length. If it is not enough, it means that a complete message has not been received yet, so it waits for more data to arrive. If a complete message can be read in the cache, it is decoded according to the message format and converted from binary to a request object in a specific format, such as RocketMQ’s RemotingCommand object.

Since this pattern is very common, Netty provides a generic implementation of this decoding: LengthFieldBasedFrameDecoder, which can read a complete message from a binary stream and cache it itself. The application program only needs to implement the conversion of ByteBuf to a specific request object. The example of NettyDecoder is as follows:

14

The responsibility of NettyEncoder is to convert the request object into ByteBuf, that is, convert it into binary stream. It only needs to convert the object into a protocol format (Header + Body) like the one shown in the above diagram.

Thread Isolation Mechanism #

In general, when the server receives a request, it decodes it into a request object and needs to process it based on the request object. To avoid blocking the IO read thread during business processing, additional thread pools, called business thread pools, are usually used for business processing. RocketMQ’s approach in this area is worth learning. It provides different thread pools for different businesses to implement thread isolation.

For each request, RocketMQ encodes it, and then each type of request corresponds to a Processor (business processing logic), and the Process is registered with the specified thread pool to achieve thread isolation.

Step 1: First, during server startup, static registration is performed to associate the request handler with the executing thread pool. The code example is as follows:

15

Step 2: After the server receives the request object, it obtains the corresponding Processor and thread pool based on the request command, and then submits the task to the thread pool for execution. The code example is as follows (NettyRemotingAbstract#processRequestCommand).

16

This is where this article will end. Taking the use of Netty programming in RocketMQ as the starting point, we have summarized the pattern of network programming based on Netty.