26 Learning Gossip Protocol Implementation From Ping Pong Messages

26 Learning Gossip Protocol Implementation from PingPong Messages #

Starting from this lesson, we will enter a new module: the “Redis Cluster” module. In this module, I will guide you through the key functionalities of Redis Cluster, including the implementation of the Gossip protocol communication, cluster key commands, and mechanisms for data migration.

Through these lessons, on one hand, you can gain a deep understanding of how Redis maintains cluster relationships, forwards requests, and performs data migration. This knowledge can help you troubleshoot cluster-related issues. On the other hand, when developing distributed clusters, you will inevitably encounter design issues related to node information maintenance, data placement, and migration. The upcoming lessons will help you master the typical design and implementation of key mechanisms in distributed clusters, such as the Gossip protocol and data migration. These implementation methods will be very helpful for you when developing distributed clusters.

So, let’s start by learning about the communication mechanism of nodes in Redis Cluster, where the key is the Gossip protocol. Therefore, in today’s lesson, we will mainly explore how the Gossip protocol is implemented in Redis.

Basic Mechanism of the Gossip Protocol #

For a distributed cluster, maintaining the information and status of cluster nodes is essential for its proper functioning. To achieve this goal, we can typically choose a centralized approach by using a third-party system like Zookeeper or etcd to maintain the information and status of cluster nodes. Alternatively, we can choose a decentralized approach, where each node maintains the information and status of other nodes and uses the Gossip cluster communication protocol to propagate updates among nodes, ensuring that each node has consistent information.

The following diagram illustrates these two methods for maintaining cluster node information:

I have previously introduced the working mechanism of the Gossip protocol in the course “Communication Overhead: Key Factors Limiting the Scale of Redis Cluster” in the first season (link to the course). You can refer to or review it there. Here, I will briefly introduce the main mechanisms of the Gossip protocol to help you better understand the design and implementation of the Gossip protocol at the source code level.

In a cluster that uses the Gossip protocol, each cluster node maintains a copy of the cluster’s status information, including information about the nodes in the cluster, their running status, and the distribution of data among nodes.

For Redis, the cluster node information includes the node name, IP address, and port number, while the running status of a node is mainly represented by two timestamps: the time the node sends PING messages to other nodes and the time it receives PONG messages in return. Lastly, the data distribution in the cluster corresponds to the allocation of slots in Redis Cluster, which determines which slots are owned by each node.

When cluster nodes operate according to the Gossip protocol, each node randomly selects some other nodes from the cluster at a certain frequency and sends its own information and the known information of other nodes as PING messages to the selected nodes. When other nodes receive the PING messages, they respond with PONG messages, including their own information and the known information of other nodes. This process is illustrated in the following diagram:

The Gossip protocol propagates the node information throughout the cluster by randomly selecting communication nodes in this manner. When there is a change in the information maintained by a node, such as a change in the data layout, other nodes can also obtain this updated information after a few rounds of communication. This achieves the goal of maintaining a consistent state of information among all nodes in a distributed cluster.

Now that you have understood the basic working mechanism of the Gossip protocol, let’s move on to learning how Redis implements the Gossip protocol.

How does Redis implement Gossip communication? #

Firstly, you should know that the main functionality of Redis Cluster is defined and implemented in two files: cluster.h and cluster.c. If you need to further read the source code, you can focus on these two files.

Next, let’s look at the messages used in communication within Redis Cluster. These messages form the basis of Gossip protocol communication.

What are some common messages used in node communication? #

In the Redis source code, the messages exchanged between nodes are defined through macros in the cluster.h file. The code below lists a few common message types, including the Ping message, which a node uses to send information to other nodes, and the Pong message, which is a reply to Ping messages. The Meet message is used by a node to indicate its desire to join the cluster, while the Fail message represents a failed node. If you want to learn about more message types, you can further read the cluster.h file.

#define CLUSTERMSG_TYPE_PING 0 // Ping message, used by a node to send its information to other nodes
#define CLUSTERMSG_TYPE_PONG 1 // Pong message, reply to Ping message
#define CLUSTERMSG_TYPE_MEET 2 // Meet message, a node indicating its desire to join the cluster
#define CLUSTERMSG_TYPE_FAIL 3 // Fail message, indication of a failed node

In my previous introduction, I focused on the message types used for node communication. Now let’s take a look at the data structure of the messages in the Redis source code. This part is also defined in the cluster.h file.

Redis defines a structure called clusterMsg to represent a message exchanged between nodes. It contains information such as the name, IP, cluster communication port, and responsible slots of the node sending the message, as well as the message type, message length, and the specific message body. The code snippet below shows some important parts of the clusterMsg definition.

typedef struct {
   ...
   uint32_t totlen; // Message length
   uint16_t type; // Message type
   ...
   char sender[CLUSTER_NAMELEN]; // Name of the node sending the message
   unsigned char myslots[CLUSTER_SLOTS/8]; // Slots responsible for the sending node
   char myip[NET_IP_STR_LEN]; // IP of the sending node
   uint16_t cport; // Communication port of the sending node
   ...
   union clusterMsgData data; // Message body
} clusterMsg;

From the clusterMsg data structure, we can see that it contains a union structure called clusterMsgData, which represents the actual message body for node communication.

In the cluster.h file, we can see the definition of clusterMsgData. It includes various data structures corresponding to different message types, such as clusterMsgDataGossip, clusterMsgDataFail, clusterMsgDataPublish, and clusterMsgDataUpdate. These data structures correspond to the message bodies of different message types. See the code snippet below.

union clusterMsgData {
    // Data structure corresponding to Ping, Pong, and Meet message types
    struct {
        clusterMsgDataGossip gossip[1];
    } ping;

    // Data structure corresponding to Fail message type
    struct {
        clusterMsgDataFail about;
    } fail;

    // Data structure corresponding to Publish message type
    struct {
        clusterMsgDataPublish msg;
    } publish;

    // Data structure corresponding to Update message type
    struct {
        clusterMsgDataUpdate nodecfg;
    } update;

    // Data structure corresponding to Module message type
    struct {
        clusterMsgModule msg;
    } module;
};

Within this union structure, let’s focus on the clusterMsgDataGossip data structure, as it corresponds to the message bodies of Ping, Pong, and Meet messages used in the Gossip protocol communication. The clusterMsgDataGossip data structure is defined as follows:

typedef struct {
    char nodename[CLUSTER_NAMELEN]; // Node name
    uint32_t ping_sent; // Timestamp when the node sends a Ping message
    uint32_t pong_received; // Timestamp when the node receives a Pong message
    char ip[NET_IP_STR_LEN]; // Node IP
    uint16_t port; // Communication port of the node with clients
    uint16_t cport; // Port used by the node for cluster communication

This data structure encompasses the necessary information for the Gossip protocol communication, including the node name, timestamps for Ping and Pong messages, IP of the node, and the communication ports.

typedef union clusterMsgData {
    //...
    struct {
        clusterMsgDataGossip gossip[1];
    } ping;
    //...
} clusterMsgData;

在构建 Ping 消息体时,会遍历要发送的节点,为每个节点创建一个 clusterMsgDataGossip 结构体,并将节点的基本信息填充进去。具体的代码实现如下:

...
size_t nodes = 0;
clusterMsgDataGossip *g;
...
g = (clusterMsgDataGossip *)
    zrealloc(clusterNodeCount*sizeof(clusterMsgDataGossip), NULL);
...

clusterNode *node;
dictIterator *di = dictGetSafeIterator(state.cluster->nodes);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
    node = dictGetVal(de);
    memcpy(g[nodes].nodename,node->name,CLUSTER_NAMELEN);
    memcpy(g[nodes].ip,node->ip,sizeof(node->ip));
    g[nodes].port = node->port;
    g[nodes].ping_sent = node->ping_sent;
    /* Prevent false positive of valgrind warning */
    memset(g[nodes].notused1, 0, sizeof(g[nodes].notused1));
    nodes++;
}
dictReleaseIterator(di);
...

在这段代码中,clusterNodeCount 指的是当前节点数量,而 g 是一个 clusterMsgDataGossip 数组,数组的大小是根据当前节点数量动态分配的。然后,遍历要发送的节点,将节点的基本信息拷贝到对应的 clusterMsgDataGossip 结构体中。

第三步,发送消息

在构建完 Ping 消息头和消息体后,clusterSendPing 函数会调用 redisNetWrite 函数将消息发送给目标节点。具体的代码如下:

redisNetWrite(link->conn,sendbuf,sdslen(sendbuf))

其中,redisNetWrite 函数是 Redis 的网络 I/O 函数,用于将消息写入到套接字中。

所以,到这里,我们就完成了 Ping 消息的生成和发送。接下来,我们来看下 Pong 消息的接收和处理。

union clusterMsgData {
    struct {
        // When the message is Ping or Pong, use an array of type clusterMsgDataGossip
        clusterMsgDataGossip gossip[1];
    } ping;
    ...
}

So, when the clusterSendPing function constructs the Ping message, it writes the information of multiple nodes into the Ping message. So, how many nodes’ information does the clusterSendPing function actually write? This is actually controlled by three variables: freshnodes, wanted, and maxiterations.

First, the value of freshnodes is equal to the number of cluster nodes minus 2, as shown below:

int freshnodes = dictSize(server.cluster->nodes) - 2;

The value of wanted is also dependent on the size of freshnodes. The default value of wanted is 1/10 of the number of cluster nodes, but if this default value is less than 3, then wanted is set to 3. If this default value is greater than freshnodes, then wanted is set to the size of freshnodes. The logic for calculating this part is shown below:

wanted = floor(dictSize(server.cluster->nodes) / 10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;

Once we have the value of wanted, the value of maxiterations is set to three times the size of wanted.

int maxiterations = wanted * 3;

After calculating the sizes of freshnodes, wanted, and maxiterations, clusterSendPing enters a loop process based on the sizes of these three variables. In this loop, it randomly selects a node from the cluster nodes and calls the clusterSetGossipEntry function to set the corresponding Ping message body, which is of type clusterMsgDataGossip. You can further refer to the source code of clusterSetGossipEntry for the specific settings of clusterMsgDataGossip.

Of course, if the selected node is the current node itself, a potentially faulty node, a node that is still handshaking, a disconnected node, or a node without address information, clusterSendPing will not set the Ping message body for these nodes.

The following code shows the basic logic of clusterSendPing for setting the Ping message body:

while (freshnodes > 0 && gossipcount < wanted && maxiterations--) {
    dictEntry *de = dictGetRandomKey(server.cluster->nodes);
    clusterNode *this = dictGetVal(de);
    ...
    clusterSetGossipEntry(hdr, gossipcount, this); // Call clusterSetGossipEntry to set the Ping message body
    freshnodes--;
    gossipcount++;
}

Here, you need to note that for potentially faulty nodes, clusterSendPing will put their information at the end of the Ping message body.

Step 3: Sending the Ping Message

Alright, now that the construction of the Ping message body is completed, the last step in the main logic of the clusterSendPing function is calling the clusterSendMessage function to send the Ping message to the randomly selected target node. In this way, the required operation in the Gossip protocol, which is sending the information of the current node to the randomly selected node, is completed.

I have created the following diagram to show the main logic of the clusterSendPing function. You can review it again.

Next, let’s take a look at the processing of the Ping message and the reply of the Pong message when a node receives the Ping message.

Handling the Ping Message and Replying with the Pong Message #

In the clusterCron function introduced earlier, before a node calls the clusterSendPing function to send the Ping message to other nodes, it checks the connection status between itself and other nodes. If the connection is interrupted, the node will reestablish the connection, as shown below:

void clusterCron(void) {
    ...
    di = dictGetSafeIterator(server.cluster->nodes);
    while ((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        ...
        if (node->link == NULL) {
            ...
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->cport, NET_FIRST_BIND_ADDR);
            ...
            link = createClusterLink(node);
            link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el, link->fd, AE_READABLE, clusterReadHandler, link);

}

}

From the code, we can see that the listener function set for a node on the connection with other nodes is clusterReadHandler. Therefore, when a node receives a Ping message, it will be processed in the clusterReadHandler function. Let’s take a look at this function.

The clusterReadHandler function executes a while(1) loop and reads the received messages in this loop. When it reads a complete message, it calls the clusterProcessPacket function to process the message, as shown below:

void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    while(1) { // Continuously read received data
        rcvbuflen = sdslen(link->rcvbuf);
        
        nread = read(fd, buf, readlen); // Read the received data
        
        // When a complete message is read
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            if (clusterProcessPacket(link)) {  } // Call clusterProcessPacket function to process the message
        
    }
    }

}

Because the messages sent between nodes are not only Ping messages, the clusterProcessPacket function first reads the message type from the received message header and then executes different code branches based on different message types.

When a Ping message is received, the clusterProcessPacket function first calls the clusterSendPing function to return a Pong message to the node that sent the Ping message, as shown below:

int clusterProcessPacket(clusterLink *link) {

    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
         // Process the Meet message and add the node that sent the Meet message to the locally recorded node list
        clusterSendPing(link, CLUSTERMSG_TYPE_PONG); // Call the clusterSendPing function to return a Pong message
    }

}

From here, you can see that Ping and Pong messages are generated and sent using the same function clusterSendPing, so they also contain the same content. This means that the Pong message also contains the information of the node that sent the Pong message and the information of other nodes it knows. Therefore, the node sending the Ping message can obtain the latest information of other nodes from the Pong message, thereby achieving the goal of consistent information for each node through multi-round message propagation in the Gossip protocol.

Here, you also need to note that whether the target node receiving the Ping message or the node sending the Ping message receiving the returned Pong message, they will be processed in the same code branch of the clusterProcessPacket function, such as updating the latest return time of the Pong message, updating the local slots information based on the slots distribution information in the message header. In addition, the clusterProcessPacket function also calls the clusterProcessGossipSection function to process the multiple message bodies contained in the Ping-Pong messages in turn.

In this way, the node receiving the Ping or Pong message can update the information of the corresponding node recorded locally based on the information in the message body. You can further read the source code of the clusterProcessGossipSection function to understand its updating settings for the node information recorded locally based on the message body content.

The following code shows how the node updates its local information after receiving the Ping-Pong messages, you can take a look.

int clusterProcessPacket(clusterLink *link) {

    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        
        // Update the latest return time of the target node's Pong message when receiving the Pong message
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {
            link->node->pong_received = mstime();
            
        }
        // If the node sending the message is the master node, update the local slots distribution information
        // Call the clusterProcessGossipSection function to process the message body of the Ping or Pong message
        if (sender) clusterProcessGossipSection(hdr, link);
    }

}

Okay, so far, we have learned about the overall processing process of Ping and Pong messages sent according to the Gossip protocol. From it, we can also see the data structures and main functions used by Redis to implement the Gossip protocol. I have drawn two tables that summarize the data structures and functions introduced just now, you can review them again.

Summary #

In today’s class, I introduced to you the design and implementation of the Gossip protocol used in Redis Cluster. The key to implementing the Gossip protocol lies in two aspects. One is to send Ping-Pong messages to convey the information of the sending node itself and the information of other known nodes. To achieve this, Redis has designed the clusterMsg structure, in which the message header contains the information of the sending node, such as its name, IP address, port number, and slot distribution.

The message body in the clusterMsg structure is designed to use an array of type clusterMsgDataGossip, where each element of the array corresponds to the information of a known node of the sending node. In this way, the sending node can propagate its own information and the information of known nodes through Ping messages.

Similarly, the receiving node of a Ping message will also use the same structure to return its own information and the information of other known nodes to the sending node through a Pong message. This fulfills the requirements of the Gossip protocol.

Another key aspect of implementing the Gossip protocol is to randomly select nodes for sending. This is relatively easy to implement in the Redis Cluster source code. In fact, the clusterCron function randomly selects five nodes first, and then selects the target node with the longest time since sending a Pong message from among them. This also meets the requirements of the Gossip protocol.

Through today’s class, I hope you have gained an understanding of the message structure designed for Redis Cluster, as well as the overall execution logic of periodic Ping and Pong message sending. These are classic reference designs that you can use when developing your own Gossip protocol.

One Question Per Lesson #

In today’s lecture introduction source code, do you know why the clusterSendPing function calculates the value of wanted as one-tenth of the total number of cluster nodes?