25 Role of Pub Sub During Master Slave Failover Switch

25 Role of PubSub During Master-Slave Failover Switch #

In the previous two lessons, we learned about the basic process of how Sentinels work: Sentinels use the sentinelRedisInstance structure to keep track of the master node’s information, and within this structure, they also keep track of other Sentinels listening to the same master node. So how does a Sentinel obtain the information of other Sentinels?

This is actually related to the communication method of Publish/Subscribe (Pub/Sub) that Sentinels use during runtime. The Pub/Sub communication method allows Sentinels to subscribe to one or more channels, and when there is a message in the channel, the Sentinel can receive the corresponding message. At the same time, the Sentinel can also publish its own generated message to the channel, so that other clients subscribed to the channel can receive the message.

In today’s lesson, I will introduce you to the implementation of the Pub/Sub communication method and its application in the process of Sentinel’s work. Additionally, you will also learn how Sentinels discover each other and how clients know when a failover is completed. The Pub/Sub communication method can be used for multi-to-multi information interaction in distributed systems. After completing this lesson, you will be able to apply it when implementing communication between distributed nodes.

Alright, next, let’s take a look at the implementation of the Pub/Sub communication method.

Implementation of the Publish-Subscribe Communication Method #

The basic model of the Publish-Subscribe communication method consists of publishers, channels, and subscribers. Publishers publish messages to channels, and subscribers subscribe to channels. Once there is a message on a channel, the channel sends the message to the subscribers. A channel can have multiple subscribers, and a subscriber can subscribe to multiple channels to receive messages from multiple publishers.

The following diagram shows the basic model of publisher-channel-subscriber:

Implementation of Channels #

After understanding the basic model of the Publish-Subscribe method, let’s take a look at how channels are implemented because channels are crucial for communication between publishers and subscribers.

In Redis, the global variable server uses a member variable called pubsub_channels to store channels. The initialization of pubsub_channels is done in the initServer function (in the file server.c). The initServer function calls dictCreate to create a hash table of type keylistDictType, and uses this hash table to store channel information, as shown below:

void initServer(void) {
    
    server.pubsub_channels = dictCreate(&keylistDictType, NULL);
    
}

Note that when the hash table is of type keylistDictType, the value of each hash entry is a list. The reason for using this type to store channel information is that Redis treats the channel name as the key of the hash entry, and the subscribers who subscribe to the channel as the value of the hash entry. As we mentioned earlier, a channel can have multiple subscribers, so Redis uses a list to store the subscribers who subscribe to the same channel.

The following diagram illustrates the structure of the pubsub_channels hash table, which stores channels and subscribers:

After understanding how channels are implemented, let’s separately examine the implementations of the publish command and the subscribe command.

Implementation of the Publish Command #

In Redis, the publish command corresponds to the command publish. As I introduced in [Lesson 14], when Redis server is initialized, it initializes a command table called redisCommandTable, which records the supported commands in Redis and their corresponding implementation functions.

This command table is defined in the server.c file. To locate the specific implementation function of a Redis command, a shortcut is to search for the corresponding command in this table, which allows you to locate the implementation function of the command. We can use this method to locate the publish command and see that its corresponding implementation function is publishCommand (in the pubsub.c file), as shown below:

struct redisCommand redisCommandTable[] = {
    
    {"publish", publishCommand, 3, "pltF", 0, NULL, 0, 0, 0, 0, 0},
    
}

Let’s take a look at the publishCommand function, which calls the pubsubPublishMessage function (in the pubsub.c file) to send the actual message, and then returns the number of subscribers who receive the message, as shown below:

void publishCommand(client *c) {
    // Call pubsubPublishMessage to publish the message
    int receivers = pubsubPublishMessage(c->argv[1], c->argv[2]);
     // If Redis is running in cluster mode, send the publish command in the cluster
    addReplyLongLong(c, receivers); // Return the number of subscribers who receive the message
}

Regarding the pubsubPublishMessage function, its prototype is as follows. You can see that its two parameters are the channel to publish the message to and the specific message to publish.

int pubsubPublishMessage(robj *channel, robj *message)

The pubsubPublishMessage function looks up the channel to be published in the server.pubsub_channels hash table. If it finds the channel, it iterates through the list of subscribers corresponding to that channel and sends the message to each subscriber. In this way, as long as a subscriber subscribes to a channel, it will receive the message when the publisher publishes a message.

// Look up if the channel exists
de = dictFind(server.pubsub_channels, channel);
if (de) { // If the channel exists
    
    // Iterate through the subscribers corresponding to the channel and send the message to the subscribers
    while ((ln = listNext(&li)) != NULL) {
        client *c = ln->value;
        
         addReplyBulk(c, channel);
         addReplyBulk(c, message);
         receivers++;
     }
 }
 
 Okay, now that we understand the implementation of the publish command, let's take a look at the implementation of the subscribe command.
 
 ### Implementation of the subscribe command
 
 Similar to how we find the implementation function for the publish command, we can find the implementation function for the subscribe command, **subscribeCommand** (in the pubsub.c file), in the redisCommandTable.
 
 The logic of the subscribeCommand function is quite simple. It directly calls the pubsubSubscribeChannel function (also in the pubsub.c file) to perform the subscribe operation. Here's the code snippet:
 
 ```c
 void subscribeCommand(client *c) {
     int j;
     for (j = 1; j < c->argc; j++)
         pubsubSubscribeChannel(c, c->argv[j]);
     c->flags |= CLIENT_PUBSUB;
 }
 ```
 
 From the code, you can see that the subscribeCommand function takes a client variable as its parameter. It loops through the client's argc member variable and passes each argv member variable to the pubsubSubscribeChannel function.
 
 Speaking of argc and argv for the client, they represent the number of arguments and the specific argument values for the command to be executed. So what does this argument value refer to?
 
 In fact, we can find out by looking at the prototype of the pubsubSubscribeChannel function, shown below:
 
 ```c
 int pubsubSubscribeChannel(client *c, robj *channel)
 ```
 
 The pubsubSubscribeChannel function takes a client variable as its parameter and also **receives information about the channel**. This means that the subscribeCommand will subscribe to each channel according to the channel names attached to the subscribe command. I also provided an example of executing the subscribe command below. When this subscribe command is executed, it will subscribe to three channels: channel1, channel2, and channel3.
 
 ```c
 subscribe channel1 channel2 channel3
 ```
 
 Now let's take a closer look at the implementation of the pubsubSubscribeChannel function. The logic of this function is also quite clear and can be divided into three steps.
 
 **First**, it adds the channel to the pubsub_channels recorded by the server. If the channel is newly created, it will create a new hash entry in the pubsub_channels hash table, representing the newly created channel, and create a list to store the subscribers of this channel.
 
 If the channel already exists in the pubsub_channels hash table, the pubsubSubscribeChannel function directly retrieves the list of subscribers for that channel.
 
 **Then**, the pubsubSubscribeChannel function adds the subscriber who executes the subscribe command to the subscriber list.
 
 **Finally**, the pubsubSubscribeChannel function returns the number of successfully subscribed channels to the subscriber.
 
 The code below shows this part of the logic. Take a look:
 
 ```c
 if (dictAdd(c->pubsub_channels, channel, NULL) == DICT_OK) {
    ...
    de = dictFind(server.pubsub_channels, channel); // Find the channel in the pubsub_channels hash table
    if (de == NULL) { // If the channel doesn't exist
       clients = listCreate(); // Create a list for the subscribers
       dictAdd(server.pubsub_channels, channel, clients); // Insert the hash entry for the channel
       ...
    } else {
       clients = dictGetVal(de); // If the channel already exists, get the list of subscribers
    }
    listAddNodeTail(clients, c); // Add the subscriber to the subscriber list
 }
 
 ...
 addReplyLongLong(c, clientSubscriptionsCount(c)); // Return the number of successfully subscribed channels to the subscriber
 ```
 
 Now you understand the implementation of the publish-subscribe feature in Redis. Next, let's see how Redis Sentinel uses this publish-subscribe feature in its working process.

Application of Publish-Subscribe Method in Sentinel #

First, let’s take a look at the function sentinelEvent used by Sentinel to publish messages.

sentinelEvent Function and Message Generation #

When using the Publish-Subscribe method, Sentinel encapsulates the function called sentinelEvent (in the sentinel.c file) to publish messages. So, when you read the source code of Sentinel in the sentinel.c file and come across sentinelEvent, it means that Sentinel is using it to publish messages.

I introduced the sentinelEvent function in Lesson 22, you can review it there. The prototype of this function is as follows:

void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...)

In fact, this function ultimately calls the pubsubPublishMessage function I mentioned earlier to publish a message to a channel. So, when we want to publish a message, we need to determine two aspects: the channel to publish to and the message to publish.

The second parameter type of the sentinelEvent function represents the channel to publish to, and the message to publish is represented by the ellipsis after the fourth parameter fmt of the function.

At this point, you may wonder, why does the sentinelEvent function have an ellipsis in its parameters?

In fact, the ellipsis here represents variable arguments. When we cannot list all argument types and numbers passed to a function, we can use the ellipsis to represent variable arguments. This means that we can pass 4, 5, 6, or even more arguments to the sentinelEvent function.

Here I will use the implementation of the sentinelEvent function as an example to introduce the use of variable arguments. This way, when you develop distributed communication programs and need to generate messages with varying content, you can use the methods implemented in the Sentinel source code.

In the sentinelEvent function, in order to use variable arguments, it mainly includes four steps:

  • First, we need to define a variable of type va_list, let’s say ap. This variable is a pointer to variable arguments.
  • Then, when we want to use variable arguments in the function, we need to use the va_start macro to get the first argument of the variable arguments. The va_start macro has two parameters, one is the va_list type variable ap we just defined, and the other is the preceding argument of the variable arguments, which is the argument before the ellipsis in the sentinelEvent function parameters, fmt.
  • Next, we can use the vsnprintf function to print the content of the variable arguments according to the format defined by fmt. The vsnprintf function will retrieve each argument of the variable arguments one by one and print it.
  • Finally, after obtaining all the arguments, we need to call the va_end macro to close the ap pointer created earlier.

The following code shows this process I just described, you can take another look:

void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...) {
    va_list ap;
    ... 
    if (fmt[0] != '\0') {
        va_start(ap, fmt);
        vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
        va_end(ap);
    }
    ...
}

To give you a more intuitive understanding, I have listed three examples of calling the sentinelEvent function below, which you can study and understand.

The first example corresponds to when the Sentinel calls the sentinelCheckSubjectivelyDown function to detect that the master node is subjectively offline, and the sentinelEvent function is called in the sentinelCheckSubjectivelyDown function to publish a message to the “+sdown” channel. At this time, four parameters are passed to the sentinelEvent function and there are no variable arguments, as shown below:

sentinelEvent(LL_WARNING,"+sdown",ri,"%@");

The second example corresponds to when the Sentinel is initialized, in the sentinelGenerateInitialMonitorEvents function, the sentinelEvent function is called to publish a message to the “+monitor” channel. At this time, there are five parameters passed to the sentinelEvent function, including one variable argument, representing the quorum threshold of the Sentinel, as shown below:

sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);

The last one corresponds to sentinel’s completion of master node switch. In the sentinelFailoverSwitchToPromotedSlave function, the sentinelEvent function is called to publish the message to the “+switch-master” channel. At this point, there are 5 variable parameters passed to sentinelEvent, corresponding to the name, IP, and port of the master node before the failover, as well as the IP and port of the replica node that is promoted to be the new master node, as shown below:

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
        master->name, master->addr->ip, master->addr->port,
        ref->addr->ip, ref->addr->port);

In this way, you also understand that in the process of working, sentinels use the sentinelEvent function and pubsubPublishMessage function to publish messages. Throughout the sentinel’s work, it will use the sentinelEvent function to publish messages to different channels at key points. Apart from the three channels I just mentioned (+monitor, +sdown, +switch-master), I have listed the message publishing channels used by sentinels in the table below, which you can refer to.

channels

In fact, during the work of the sentinels, if a client wants to understand the overall situation or progress of failovers, such as whether the master node is considered subjectively offline, whether the master node is considered objectively offline, whether the leader has completed the election, and whether the new master node has completed the switch, etc., it can subscribe to the corresponding channels in the above table using the SUBSCRIBE command. In this way, the client can understand the process of failovers.

Now, let’s take a look at how sentinel’s message subscription is implemented during its work.

Sentinel Subscribing to the hello Channel #

Firstly, you need to know that each sentinel subscribes to the “sentinel:hello” channel of the master node it listens to. In Lecture 23, I introduced that sentinels periodically call the sentinelTimer function to perform periodic tasks. Among them, the operation of the sentinel subscribing to the master node’s hello channel takes place.

Specifically, when a sentinel executes the sentinelTimer function periodically, it calls the sentinelHandleRedisInstance function, and then calls the sentinelReconnectInstance function. In the sentinelReconnectInstance function, the sentinel calls the redisAsyncCommand function to send the SUBSCRIBE command to the master node, and the channel to subscribe to is specified by the macro SENTINEL_HELLO_CHANNEL (defined in the sentinel.c file), which is the “sentinel:hello” channel. The code for this part is shown below:

retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "%s %s",
                sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
                SENTINEL_HELLO_CHANNEL);

From the code, we can also see that when the sentinel receives a hello message on the “sentinel:hello” channel, it will invoke the sentinelReceiveHelloMessages function for processing. In fact, the sentinelReceiveHelloMessages function actually calls the sentinelProcessHelloMessage function to process the hello message.

For the sentinelProcessHelloMessage function, it mainly obtains the basic information of the sentinel instance that published the hello message from the hello message, such as IP, port number, and quorum threshold. If the current sentinel does not have the information of the sentinel instance that published the hello message, the sentinelProcessHelloMessage function will call the createSentinelRedisInstance function to create a record for the sentinel instance that published the hello message, so the current sentinel will have the information of other sentinel instances.

Alright, after understanding the subscription and processing of the “sentinel:hello” channel by the sentinel, we still need to understand when sentinels actually publish hello messages.

This is actually done by the sentinel when calling the sentinelSendPeriodicCommands function in the sentinelTimer function, and the sentinelSendPeriodicCommands function calls the sentinelSendHello function to complete it.

The sentinelSendHello function calls the redisAsyncCommand function to publish the hello message to the “sentinel:hello” channel of the master node. The hello message it sends includes the IP, port number, ID, and current epoch of the sentinel instance that published the hello message, as well as the name, IP, port number, and epoch information of the master node that the sentinel is monitoring.

The following code shows the generation and publishing of the hello message, you can take a look:

// the content of the hello message
snprintf(payload,sizeof(payload),
    "%s,%d,%s,%llu," // the information of the current sentinel instance, including IP, port number, ID, and current epoch
    "%s,%s,%d,%llu", // the information of the current master node, including name, IP, port number, and epoch
    announce_ip, announce_port, sentinel.myid,
    (unsigned long long) sentinel.current_epoch,
    master->name,master_addr->ip,master_addr->port,
    (unsigned long long) master->config_epoch);
// publish the hello message to the hello channel of the master node
retval = redisAsyncCommand(ri->link->cc,
        sentinelPublishReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"PUBLISH"),
        SENTINEL_HELLO_CHANNEL,payload);

Therefore, when a sentinel publishes a hello message to the “sentinel:hello” channel of the master node it listens to through sentinelSendHello, other sentinels that listen to the same master node will also subscribe to the “sentinel:hello” channel of the master node, so they can obtain the hello messages on that channel.

Through this communication method, sentinels that listen to the same master node can know each other’s access information. In this way, sentinels can perform master node status judgments and leader elections based on this access information.

Summary #

In today’s class, we learned about the publish-subscribe communication method implemented by Redis. This method provides a way for two parties to communicate by using channels to exchange messages. The different names of different channels represent different states in the sentinel work process. When a client needs to understand the progress of the sentinel’s work or the status of the master node, it can subscribe to the channel where the sentinel publishes messages.

Of course, for a sentinel, the channel it will definitely subscribe to is the " sentinel :hello" channel of the master node it is monitoring. Through this channel, different sentinels monitoring the same master node can interact with each other by exchanging hello messages on the channel, such as the sentinel’s IP address and port number.

Furthermore, in this class, I also introduced you to a trick for using variable arguments in the C language. When developing publish-subscribe functionality, you need to generate messages to be published, and variable arguments can be used to generate messages of varying lengths. I hope you can apply this little trick.

Questions for Each Lesson #

If we execute the publish command on a sentinel instance, is this command processed by the publishCommand function in the pubsub.c file?