27 Handling Commands From Moved Ask in Cluster Nodes

27 Handling Commands from MOVED ASK in Cluster Nodes #

In the previous lesson, I introduced three aspects of Redis Cluster: how nodes exchange information and their running status, how nodes handle commands, and how data migrates between nodes. Through the previous lesson, we have already learned the basic implementation of the Gossip protocol, which includes the data structure and key function design and implementation that support inter-node information and running status exchange in the cluster.

So in today’s lesson, let’s understand the implementation of cluster command handling. This part includes not only the basic process of how a cluster node handles a command but also the implementation of cluster-specific commands MOVED and ASK. These two commands correspond to the handling scenarios of request redirection in Redis Cluster. By understanding this part, we can refer to Redis Cluster to design and implement request redirection in distributed systems.

Next, let’s first take a look at the basic process of how a cluster node handles a command, which will give us an overall understanding of the implementation of cluster nodes.

Basic Process of Cluster Node Command Handling #

In [Lesson 14], I mentioned that the process of handling a Redis server command can be divided into four stages: command reading, command parsing, command execution, and result returning. Just like a single Redis server, nodes in Redis Cluster also handle commands in the same stages.

Therefore, the entry functions for command handling in each stage of cluster nodes are the same as those of a single Redis server, as shown in the following figure. You can also review the detailed process of command handling that I introduced in Lesson 14.

However, if a Redis server is a cluster node, there will be an additional processing step during the command execution stage, which corresponds to the issue of request redirection that may occur in Redis Cluster.

The term request redirection refers to the situation where a cluster node receives a command from a client and finds that the requested data is not available locally. Therefore, the node needs to redirect the client’s request to the node that actually owns the data so that the client’s command can be executed normally.

It is important to note that request redirection is a common problem in the design of distributed systems. Especially for distributed systems like Redis Cluster, which do not use centralized third-party systems to maintain data distribution, request redirection is inevitable when the cluster experiences data migration due to load balancing or node failures. So, understanding this design step is also of great reference value for developing distributed systems.

Now, let’s first take a look at the additional processing step for cluster nodes during the command execution stage, which is implemented in the processCommand function (in the server.c file).

During the execution of the processCommand function, it will check whether the current node is in cluster mode, which is determined by the global variable cluster_enabled in the server object. If the current node is in cluster mode, the processCommand function will check whether request redirection is needed.

Of course, if the current node receives the command from its master node in the cluster, or if the command received does not contain a key parameter, in these cases, the cluster node will not be involved in the operation of request redirection. However, there is one exception for a command without a key parameter, which is the EXEC command. If the current node receives the EXEC command, the processCommand function will still check whether request redirection should be performed.

So, how does the processCommand function determine whether to perform request redirection?

In fact, it calls the getNodeByQuery function (in the cluster.c file) to query which cluster node can process the received command. If the result returned by the getNodeByQuery function is empty or if the queried cluster node is not the current node, then the processCommand function will call the clusterRedirectClient function (in the cluster.c file) to perform the actual request redirection.

The following code shows the additional processing step for request redirection during the command execution process in cluster nodes:

int processCommand(client *c) {
   // ...
   // If the current Redis server enables the Redis Cluster mode; The command is not from the master node of the current node;
   // The command contains the key parameter, or the command is EXEC
   if (server.cluster_enabled && !(c->flags & CLIENT_MASTER)
            && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER)
            && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
                    c->cmd->proc != execCommand))
   {
      // ...
      clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code); // Query which cluster node can process the current command
      if (n == NULL || n != server.cluster->myself) {
         // ...
         clusterRedirectClient(c, n, hashslot, error_code); // Perform the actual request redirection
         return C_OK;
      }
   }
   // ...
}

Of course, if request redirection is not required, the processCommand function will continue with the subsequent steps and call the call function to execute the command.

The following figure shows the basic execution logic added for cluster nodes in the processCommand function. You can review it again.

Alright, next, let’s see how the getNodeByQuery function queries the cluster node that can process a command.

How to query cluster nodes that can execute commands? #

First, let’s take a look at the prototype of the getNodeByQuery function, as shown below:

clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code)

Its function parameters include the command and arguments received by the node. In addition, it also includes two pointers: hashslot and error_code, which represent the slot (hash slot) where the key of the command is located and the error code after the function execution, respectively. Moreover, the return value of the getNodeByQuery function is of type clusterNode, representing the cluster node that can process the command.

Next, let’s take a look at the specific execution process of the getNodeByQuery function. This process can be divided into three steps.

Step 1: Encapsulate the received command using the multiState structure #

Since a cluster node can receive a MULTI command, which means that the subsequent commands are to be executed as a transaction. When a Redis server receives a MULTI command from the client, it calls the processing function multiCommand (in the multi.c file), and sets the CLIENT_MULTI flag in the client structure variable to indicate that the client is in MULTI mode, as shown below:

void multiCommand(client *c) {
   // ...
   c->flags |= CLIENT_MULTI; // Set the CLIENT_MULTI flag in the client structure
   addReply(c,shared.ok);
}

When processing commands in the processCommand function (as explained earlier), it checks whether the client variable client has the CLIENT_MULTI flag. If so, and the current command is not EXEC, DISCARD, MULTI, or WATCH, the processCommand function calls the queueMultiCommand function to cache the subsequent commands in the mstate member variable of the client structure. The mstate member variable is of type multiState and it records the subsequent commands after the MULTI command and the number of commands.

The following code shows how the processCommand function handles the CLIENT_MULTI flag. You can also read the queueMultiCommand function (in the multi.c file) and the client structure (in the server.h file) to understand the recording process of subsequent commands after the MULTI command in further detail.

int processCommand(client *c) {
    // ...
    // If the client has the CLIENT_MULTI flag and the current command is not EXEC, DISCARD, MULTI, or WATCH
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c); // Cache the command
        // ...
    }
}

Actually, the aforementioned process of how Redis server handles the MULTI command and caches subsequent commands is also applicable to cluster nodes. In other words, the getNodeByQuery function needs to consider the MULTI command when querying the key of the command.

Therefore, in order to use the same data structure to handle subsequent commands after the MULTI command and regular single commands, the getNodeByQuery function uses the multiState structure to encapsulate the current command, as shown below:

multiState *ms, _ms; // Use the multiState structure to encapsulate the current command
// ...
if (cmd->proc == execCommand) { // If the EXEC command is received, check the key visited by subsequent MULTI commands, and get the cached command from the client variable c
   // ...
   ms = &c->mstate;
} else {
   ms = &_ms;  // If it is another command, also use the multiState structure to encapsulate the command
   _ms.commands = &mc;
   _ms.count = 1;  // The number of encapsulated commands is 1
   mc.argv = argv; // Command arguments
   mc.argc = argc; // The number of command arguments
   mc.cmd = cmd; // The command itself
}

Please note that the subsequent commands that are cached after the MULTI command are not executed immediately; they will be executed later when the EXEC command is executed. Therefore, in the aforementioned code, the getNodeByQuery function also retrieves the cached commands from the client variable c only when the EXEC command is received.

Alright, up to this point, you can see that the getNodeByQuery function uses the multiState structure to encapsulate the current command. Next, it will check the keys visited by the command.

Step 2: Check the slots where each command visits one by one #

The getNodeByQuery function will execute a loop according to the number of commands recorded in the multiState structure. It will check the keys visited by each command one by one. Specifically, it calls the getKeysFromCommand function (in db.c file) to obtain the positions and number of keys in the command.

Then, for each key, it calls the keyHashSlot function (in cluster.c file) to query the slot of the key and find the cluster node to which the slot belongs in the cluster member variable of the global variable server, as shown below:

for (i = 0; i < ms->count; i++) {
   // ...
   // Get the positions and number of keys in the command
   keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
   // Handle each key
   for (j = 0; j < numkeys; j++) {
      // ...
      int thisslot = keyHashSlot((char*)thiskey->ptr, // Get the slot to which the key belongs
                                 sdslen(thiskey->ptr));
      if (firstkey == NULL) {
         // ...
         slot = thisslot;
         n = server.cluster->slots[slot]; // Find the cluster node to which the slot corresponding to the key belongs
      }
      // ...
    }
}

Next, the getNodeByQuery function will determine the following three cases based on the search result of the cluster nodes.

  • Case 1: If the searched cluster node is empty, it will throw an error and set error_code to CLUSTER_REDIR_DOWN_UNBOUND.
if (n == NULL) {
   
    if (error_code)
       *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
    return NULL;
}
  • Case 2: If the searched cluster node is the current node, and the slot that the key belongs to is undergoing data migration, the getNodeByQuery function will set the migrating_slot variable to 1 to indicate that data migration is in progress.
  • Case 3: If the slot that the key belongs to is undergoing data import, the getNodeByQuery function will set the importing_slot variable to 1 to indicate that data import is in progress.

The code logic for cases 2 and 3 is as follows:

// If the slot that the key belongs to is being migrated, set migrating_slot to 1
if (n == myself && server.cluster->migrating_slots_to[slot] != NULL)
{
   migrating_slot = 1;
} 
// If the slot that the key belongs to is being imported, set importing_slot to 1
else if (server.cluster->importing_slots_from[slot] != NULL) {
   importing_slot = 1;
}

Here, it is important to note that if the command contains more than one key and these keys belong to different slots, the getNodeByQuery function will also throw an error and set error_code to CLUSTER_REDIR_CROSS_SLOT.

At this point, the getNodeByQuery function has found the slot where the command accesses the key and the corresponding cluster node. If the node is currently performing data migration or data import, the getNodeByQuery function will call the lookupKeyRead function (in the db.c file) to check if the key being accessed by the command is present in the current node’s database. If not, it will use a variable missing_keys to record the number of missing keys, as shown below:

// If the slot to which the key belongs is being migrated or imported, and the key being accessed is not in the local database, increase the size of missing_keys
if ((migrating_slot || importing_slot) && lookupKeyRead(&server.db[0],thiskey) == NULL)
{
    missing_keys++;
}

Next, the getNodeByQuery function will return the corresponding results based on the slot check.

Step 3: Return hashslot, error_code, and the corresponding cluster node based on the slot check result. #

In the return result of the getNodeByQuery function, we can focus on the following four cases.

Case 1: The slot to which the accessed key belongs does not have a corresponding cluster node. In this case, the getNodeByQuery function will return the current node. It is possible that there is a cluster failure causing the inability to find the node corresponding to the slot, and the error_code will have the corresponding error message.

if (n == NULL) return myself;

Case 2: The slot to which the accessed key belongs is currently undergoing data migration or data import, and the current command is used to execute the MIGRATE command for data migration. In this case, the getNodeByQuery function will return the current node as shown below:

if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
   return myself;

Case 3: The slot to which the accessed key belongs is currently undergoing data migration, and the key being accessed in the current node’s database is missing, i.e., missing_keys introduced earlier is greater than 0. In this case, the getNodeByQuery function will set error_code to CLUSTER_REDIR_ASK and return the target node for data migration.

if (migrating_slot && missing_keys) {
    if (error_code) *error_code = CLUSTER_REDIR_ASK;
    return server.cluster->migrating_slots_to[slot];
}

Case 4: The node corresponding to the slot to which the accessed key belongs is not the current node, but another node. In this case, the getNodeByQuery function will set error_code to CLUSTER_REDIR_MOVED and return the actual node corresponding to the slot to which the key belongs.

if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
    return n;

At this point, we have understood the process of key lookup in the getNodeByQuery function. I have created a diagram showing the basic execution process of the getNodeByQuery function, which you can review.

So now, having obtained the query result for the node to which the key belongs, how does the processCommand function proceed with request redirection?

In fact, this step is accomplished by executing the function clusterRedirectClient, which handles request redirection.

Request for Execution of the clusterRedirectClient Function #

The clusterRedirectClient function is called when the getNodeByQuery function finds an empty cluster node or a node that is not the current node.

The logic of the clusterRedirectClient function is relatively simple. It executes the corresponding code branch based on the different values of the error_code returned by the getNodeByQuery function. Its main purpose is to return information about the cluster node corresponding to the slot to which the key belongs to the client, so that the client can make appropriate processing based on the returned information. For example:

  • When the error_code is set to CLUSTER_REDIR_CROSS_SLOT, the clusterRedirectClient function returns an error message to the client saying “key is not in the same slot”.
  • When the error_code is set to CLUSTER_REDIR_MOVED, the clusterRedirectClient function returns the MOVED command to the client, along with the slot to which the key belongs, and the IP and port number of the actual node to which the slot belongs.
  • When the error_code is set to CLUSTER_REDIR_ASK, the clusterRedirectClient function returns the ASK command to the client, along with the slot to which the key belongs, and the IP and port number of the target node to which the slot is being migrated.

The code below shows how the clusterRedirectClient function handles the three error_codes mentioned above. You can take a look at it.

void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
}
…
else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK)
    {
        addReplySds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d\r\n",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot,n->ip,n->port));
  }
  …
}

With this, the process of processing commands received by cluster nodes is completed.

Finally, I would like to remind you that there is a difference between Redis Cluster clients and clients for individual Redis servers. Redis Cluster clients need to handle error messages returned by nodes. For example, if a cluster node returns the MOVED command, the client needs to access the actual node that has the data based on this command and the actual node’s IP and port number included in it.

Summary #

In this lesson, I introduced to you the process of how cluster nodes handle client commands. Similar to the process of a single Redis server handling commands, cluster nodes also go through four stages: command reading, parsing, execution, and result return. Cluster nodes also use the same entry point processing function as a single Redis server.

However, you should know that Redis Cluster performs data migration due to load balancing or node failures. This can result in the client accessing keys that are not located on the node receiving the command. Therefore, in the processCommand function of cluster nodes, additional logic is added for cluster mode. This primarily includes calling the getNodeByQuery function to query which node the accessed key actually belongs to, and calling the clusterRedirectClient function to execute request redirection based on the query result.

In fact, for distributed clusters, the request redirection mechanism designed and implemented by Redis Cluster is a good reference example. It fully considers scenarios where data is being migrated in both the MOVED and ASK redirection cases. This design is worth learning. Additionally, when the getNodeByQuery function queries the slot and node to which a key belongs, it also considers Redis transactions. When querying the key of a command, it cleverly uses the same data structure, multiState, to encapsulate both multiple commands involved in a transaction and regular single commands. This increases code reusability, which is also worth learning.

Of course, in this lesson, we have mentioned data migration several times. Therefore, in the next lesson, I will introduce the specific implementation of data migration in Redis Cluster.

Question for each lesson #

After calling the getNodeByQuery function, the processCommand function actually calls the clusterRedirectClient function to redirect the request. Before doing so, it calls either the discardTransaction or flagTransaction function depending on whether the current command is EXEC or not. By reading the source code, do you know the purpose of calling discardTransaction and flagTransaction here?

int processCommand(client *c) {

    clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                            &hashslot,&error_code);
    if (n == NULL || n != server.cluster->myself) {
       if (c->cmd->proc == execCommand) {
          discardTransaction(c);
       } else {
          flagTransaction (c);
       }
       clusterRedirectClient(c,n,hashslot,error_code);
       return C_OK;
      }
      
}

Answer #

Based on the provided code snippet, here is an interpretation of the purpose of calling discardTransaction and flagTransaction:

  1. discardTransaction: This function is called when the processCommand function determines that the current command is EXEC. In this case, it discards the entire transaction associated with the client, which means that any previously queued commands in the client’s transaction will be abandoned. This is typically done when the command needs to be redirected to a different node in a cluster, indicating that the transaction should not be executed on the current node.

  2. flagTransaction: This function is called when the current command is not EXEC. It is used to mark the transaction associated with the client as “dirty” or “unfinished.” This flag indicates that the transaction is incomplete and cannot be executed until it explicitly receives the EXEC command. By flagging the transaction, the client can continue sending more commands to be executed as part of the same transaction at a later time.

These two functions play a role in managing transactions within a cluster, ensuring that the proper actions are taken when redirecting commands to different nodes and maintaining the integrity of multi-command transactions.