28 Does Redis Cluster Data Migration Cause Blockage

28 Does Redis Cluster Data Migration Cause Blockage #

In the previous lesson, I introduced the process of how Redis Cluster nodes handle commands. Now you know that in this process, the node calls the getNodeByQuery function to check which node the accessed key belongs to. If the node receiving the command is not the owner of the key, the current node will generate an error message of CLUSTER_REDIR_MOVED or CLUSTER_REDIR_ASK, and return the MOVED or ASK command to the client.

In fact, these two error messages correspond to data migration in Redis Cluster. Data migration is a common problem in distributed storage clusters. When the load pressure on the cluster nodes is unbalanced, or new nodes join or existing nodes go offline, data needs to be migrated between different nodes. Therefore, designing and implementing data migration is something we need to consider in the process of cluster development.

So in today’s lesson, I will introduce how Redis Cluster implements data migration. Understanding this part from the source code level can help you understand the impact of data migration on the normal processing of commands by cluster nodes, so that you can choose the right time to perform migration. Moreover, mastering the implementation of Redis data migration can also provide you with a good reference example for your own cluster development.

Alright, next, let’s take a look at the main data structures related to data migration. These data structures are important as they record the status information of data migration.

Data Structure for Recording Data Migration #

First of all, you need to know that in Redis Cluster, key-value pairs are first mapped to hash slots, and then data distribution among cluster nodes is achieved by assigning slots to different nodes. For more knowledge about this, you can refer to Lesson 9 of the first season.

In terms of the implementation in the source code, each cluster node in Redis Cluster corresponds to a clusterNode structure (defined in the cluster.h file). This structure contains a character array used to record which slots are assigned to the current node.

Here is the definition of this array. Its length is defined by the macro CLUSTER_SLOTS divided by 8. CLUSTER_SLOTS is defined to be 16384, which represents the total number of slots in Redis Cluster. After dividing by 8, it means that each bit of each element in the array represents one slot. If the value of a bit is 1, it means that the current node is responsible for that slot.

typedef struct clusterNode {
   
   unsigned char slots[CLUSTER_SLOTS/8]
   
}

However, only using the slots array in clusterNode is not enough to record the data migration. Therefore, Redis Cluster has designed a clusterState structure (also defined in cluster.h) for the entire cluster. This structure contains three arrays of clusterNode type and a radix tree of rax type. The size of these three arrays is also 16384, representing the total number of slots in the cluster, as shown below:

typedef struct clusterState {
   ...  
   clusterNode *migrating_slots_to[CLUSTER_SLOTS];
   clusterNode *importing_slots_from[CLUSTER_SLOTS];
   clusterNode *slots[CLUSTER_SLOTS];
   rax *slots_to_keys;
   ...
}

These structures are mainly used to record the data migration. Here is their meaning:

  • The migrating_slots_to array represents which node a slot that the current node is responsible for is migrating to. For example, migrating_slots_to[K] = node1 means that the slot K, which the current node is responsible for, is being migrated to node1.
  • The importing_slots_from array represents from which node the current node is importing a certain slot. For example, importing_slots_from[L] = node3 means that the current node is importing slot L from node3.
  • The slots array represents which node is responsible for each of the 16384 slots. For example, slots[M] = node2 means that node2 is responsible for slot M.
  • The slots_to_keys radix tree is used to record the mapping between slots and keys, allowing fast lookup of the keys on a slot.

Now that we have understood the data structures used to record data migration, let’s learn about the specific process of data migration.

Design and Implementation of Data Migration Process #

The entire process of migrating data in Redis Cluster can be divided into five major steps:

  • Marking the source and target nodes for migration;
  • Retrieving the keys to be migrated;
  • Migrating the data from the source node;
  • Processing the migrated data on the target node;
  • Marking the migration result.

Now let’s take a look at the source code implementation of these five steps.

Marking the Source and Target Nodes #

In Redis Cluster, when migrating data, we first need to use the CLUSTER SETSLOT command to mark the source node of the data to be migrated on the target node. The command is as follows:

CLUSTER SETSLOT <slot> IMPORTING <node>

Here, <slot> represents the hash slot to be migrated, and <node> represents the source node responsible for <slot>.

Then, we need to use the CLUSTER SETSLOT command to mark the target node for the data to be migrated on the source node. The command is as follows:

CLUSTER SETSLOT <slot> MIGRATING <node>

Similarly, <slot> represents the hash slot to be migrated, and <node> represents the target node that <slot> will be migrated to.

To help you understand, let me give an example. Let’s assume that slot 3 is on node A, and we want to migrate slot 3 from node A to node B. In this case, node A is the source node for the data to be migrated, and node B is the target node for the data to be migrated. We need to execute the following command on node B to mark the source node:

CLUSTER SETSLOT slot3 IMPORTING nodeA

Then, on node A, we execute the following command to mark the target node:

CLUSTER SETSLOT slot3 MIGRATING nodeB

For the CLUSTER command, its processing function is clusterCommand (in the cluster.c file). In this function, it executes different code branches based on the different options carried by the CLUSTER command. Therefore, for the SETSLOT option that marks the slot for migration, the corresponding code branches in the clusterCommand function for the migration out and migration in are as follows:

void clusterCommand(client *c) {
...
// Processing the SETSLOT option
else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
   ...
   // Processing the migrating flag
   if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
  ...
   }
   // Processing the importing flag
   else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
  ...
   }
}

Here, let’s take a look at the specific logic for handling the migrating and importing flags. In fact, the clusterCommand function basically divides the handling logic for these two flags into three steps.

Step 1: For data migration out, the function checks if the slot to be migrated is on the current node, while for data migration in, it checks if the slot to be migrated is already on the current node. If the slot to be migrated is not on the current node for migration out, or if the slot to be migrated is already on the current node for migration in, the clusterCommand function returns an error message. This is because the node cannot perform slot migration in these two cases.

Step 2: If the slot to be migrated is on the current node for migration out, or if the slot to be migrated is not on the current node for migration in, the clusterCommand function calls the clusterLookupNode function (in the cluster.c file) to query the included node ID in the CLUSTER SETSLOT command. This mainly relies on the clusterLookupNode function to find and return the corresponding node in the cluster->nodes array of the server global variable based on the input node ID.

Step 3: The clusterCommand function sets the migrating slot in the migrating_slots_to array or the importing slot in the importing_slots_from array to the result of the clusterLookupNode function.

I have also drawn two diagrams showing the basic logic of how the clusterCommand function handles the migrating and importing flags in the CLUSTER SETSLOT command. You can take a look again.

Handling migrating flag

Handling the migrating flag

Handling importing flag

Handling the importing flag

With this, after marking the source and target nodes for migration in Redis Cluster, we can use the CLUSTER GETKEYSINSLOT command to retrieve the keys to be migrated. Let’s take a look at the implementation of this step.

Retrieving the Keys to be Migrated #

The specific command we use to retrieve the keys to be migrated is as follows, where <slot> represents the slot to be migrated and <count> represents the number of keys to be migrated:

CLUSTER GETKEYSINSLOT <slot> <count>

Since we are still using the CLUSTER command here, the command processing for obtaining the keys to be migrated is still performed in the clusterCommand function, corresponding to the code branch of the GETKEYSINSLOT option, as shown below:

void clusterCommand(client *c) {

// Process GETKEYSINSLOT option
else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {...}

The processing logic of this code branch is relatively simple, and it can be divided into three steps.

First, this code branch will call the getLongLongFromObjectOrReply function from the “object.c” file to obtain the parameter from the CLUSTER GETKEYSINSLOT command and assign it to the maxkeys variable, as shown below:

// Parse 'slot' parameter
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
      return;
// Parse 'count' parameter and assign it to 'maxkeys'
if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)!= C_OK)
      return;

Then, the clusterCommand function will call the countKeysInSlot function from the “db.c” file to obtain the actual number of keys in the slot to be migrated. If the migration count maxkeys obtained from the command is greater than the actual number of keys, the value of maxkeys will be updated to the actual number of keys. Next, the clusterCommand function will allocate space for these keys.

unsigned int keys_in_slot = countKeysInSlot(slot);  // Obtain the actual number of keys in the migration slot
if (maxkeys > keys_in_slot) maxkeys = keys_in_slot; // Update 'maxkeys' to the actual number of keys if it is smaller
keys = zmalloc(sizeof(robj*)*maxkeys); // Allocate space for the keys

Finally, this code branch will call the getKeysInSlot function from the “db.c” file to obtain the actual keys from the migration slot and return these keys to the client. The code is shown below:

numkeys = getKeysInSlot(slot, keys, maxkeys); // Obtain the actual keys
addReplyMultiBulkLen(c,numkeys); // Return the keys to the client
for (j = 0; j < numkeys; j++) {
     addReplyBulk(c,keys[j]);
     decrRefCount(keys[j]);
}

So far, the client has obtained a certain number of keys to be migrated through the CLUSTER GETKEYSINSLOT command. Next, we will start the actual migration operation. Let’s take a closer look.

Actual Data Migration on the Source Node #

When performing actual data migration, we need to execute the MIGRATE command on the source node that contains the data to be migrated. Actually, the MIGRATE command supports migrating both a single key and multiple keys at once, and the basic processing flow of these two scenarios is the same, which is implemented in the migrateCommand function.

Here, I will take the example of migrating multiple keys at once with the MIGRATE command. This command includes options such as the IP address and port number of the destination node, the database ID, the timeout for migration, and the multiple keys to be migrated. The format of the command is as follows:

MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN

From this command, you can also see the COPY and REPLACE options, which have the following meanings:

  • COPY: If the keys to be migrated already exist on the destination node, an error will be returned. If the keys do not exist on the destination node, normal migration will be performed, and the keys will be deleted from the source node after migration.
  • REPLACE: Regardless of whether the keys to be migrated already exist on the destination node, normal migration will be performed, and existing keys will be overwritten.

Okay, after understanding the meaning of the MIGRATE command, let’s take a look at the basic processing flow of the migrateCommand function, which can be divided into four steps.

Step 1: Command Parameter Validation

The migrateCommand function first checks the parameters carried by the MIGRATE command, such as whether there are COPY or REPLACE flags, whether the dbid and timeout can be read correctly, etc. In this step, if the timeout value is found to be less than or equal to 0, the timeout value will be set to 1000 milliseconds for timeout judgment during migration.

Step 2: Read the Keys and Values to Be Migrated

After checking the command parameters, the migrateCommand function allocates two arrays, ov and kv, whose initial sizes are equal to the number of keys to be migrated in the MIGRATE command. Then, the migrateCommand function calls the lookupKeyRead function from the “db.c” file to check one by one whether the keys to be migrated exist. This is because some keys may have expired during migration, so there is no need to migrate these keys. At the end of this step, the migrateCommand function sets the number of keys to be migrated based on the actual number of existing keys.

The following code shows the basic logic of this step:

ov = zrealloc(ov,sizeof(robj*)*num_keys);  // Allocate the 'ov' array to store the values to be migrated
kv = zrealloc(kv,sizeof(robj*)*num_keys); // Allocate the 'kv' array to store the keys to be migrated
...
for (j = 0; j < num_keys; j++) {
    // Check whether the key to be migrated exists one by one
   if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
            kv[oi] = c->argv[first_key+j]; // Only record the existing keys
            oi++;
   }
}

Hope this helps! Let me know if you have any further questions. num_keys = oi; // The number of keys to be migrated is equal to the actual number of keys that exist

Step 3: Fill in the migration command, key, and value

Next, the migrateCommand function starts preparing for the data migration. The operations in this step mainly include:

  • Call the migrateGetSocket function (in the cluster.c file) to establish a connection with the destination node;
  • Call the rioInitWithBuffer function to initialize a buffer, and then call functions such as rioWriteBulkString and rioWriteBulkLongLong (in the rio.c file) to fill the command, key, and value to be sent to the destination node into this buffer.

The following code also shows the main commands, keys, and values that are filled in during this step, you can take a look.

rioInitWithBuffer(&cmd,sdsempty()); // Initialize buffer
...  // Fill SELECT command into buffer
// For each key to be migrated, fill in the command, key, and value into the buffer
for (j = 0; j < num_keys; j++) {
    // In cluster mode, fill in the RESTORE-ASKING command to be sent to the destination node
    if (server.cluster_enabled)
        serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
   ...
   // Fill in the key
   serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
                sdslen(kv[j]->ptr)));
   // Fill in the TTL
   serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
   // Call the createDumpPayload function to serialize the value
   createDumpPayload(&payload,ov[j],kv[j]);
   // Fill in the value
   serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr,
   ...
}

Here, it is important to note that the migrateCommand function calls the createDumpPayload function (in the cluster.c file) to serialize the value of the key to be migrated for transmission. In the serialized result, the createDumpPayload function adds the RDB version number and the CRC checksum. When the destination node receives the migration data, it will also check these two parts. I will explain it to you later.

After filling in the commands, keys, and values to be sent to the destination node in the buffer, the migrateCommand function begins to send the content in this buffer.

Step 4: Send the migration commands and data, and read the return results

The migrateCommand function calls the syncWrite function (in the syncio.c file) to send the content in the buffer to the destination node in 64KB increments, as shown below:

while ((towrite = sdslen(buf)-pos) > 0) {
    towrite = (towrite > (64*1024) ? (64*1024) : towrite);
    nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
    ...
    pos += nwritten;
}

Then, for each key-value pair sent to the destination node, the migrateCommand function calls the syncReadLine function (in the syncio.c file) to read the return result from the destination node. If there is an error message in the return result, it will be processed accordingly. This logic is not complicated, but there are many handling cases for various error situations. You can further read the source code for learning.

// For each key-value pair to be migrated, call the syncReadLine function to read the return result from the destination node
for (j = 0; j < num_keys; j++) {
   if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) { ...}
   ... // Process the return result from the destination node
}

Alright, up to this point, you have learned about the basic process of executing the MIGRATE command. I have also drawn a diagram that depicts the four major steps of the execute process. You can review it again.

In fact, when migrating data, the processing of the migration command by the destination node is also an important part of the migration process. So now let’s take a look at the processing steps of the destination node after receiving the RESTORE-ASKING command.

Processing of the destination node for migration data #

After the destination node receives the RESTORE-ASKING command sent by the source node, the actual processing function for this command is restoreCommand (in the cluster.c file). The logic of this function is not complicated and can mainly be divided into three steps.

First, it parses the received command parameters, including the replace flag indicating whether to overwrite data, the ttl flag indicating the expiration time of the key, the idletime flag indicating the LRU flag of the key, and the freq flag indicating the LFU flag of the key. Then, it performs a series of checks based on these flags.

This includes, if it detects that there is no replace flag, it calls the lookupKeyWrite function (in the db.c file) to check whether the key to be migrated already exists in the destination node’s database. If the key to be migrated already exists, it will return an error message, as shown below. In addition, it also checks if the TTL value is less than 0.

// If there is no replace flag and the key to be migrated already exists in the database of the destination node 
if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
        addReply(c,shared.busykeyerr); // Return an error message
        return;
 }

Then, the restoreCommand function will check the serialization result of the value of the migrated key. As I mentioned earlier, when the migrateCommand function actually migrates the value, it serializes the value before transmission. The serialized result includes the RDB version and CRC checksum. The restoreCommand function will call the verifyDumpPayload function (in the cluster.c file) to check the RDB version and CRC checksum. If these two parts are not correct, it will return an error message.

// Check RDB version and CRC checksum in the serialized value
if (verifyDumpPayload(c->argv[3]->ptr, sdslen(c->argv[3]->ptr)) == C_ERR) {
    addReplyError(c, "DUMP payload version or checksum are wrong");
    return;
}

Next, the restoreCommand function will call the rdbLoadObjectType and rdbLoadObject functions (in the rdb.c file) to parse the actual value type and value from the serialized result.

Finally, the restoreCommand function will call the dbAdd function to write the parsed key and value into the destination node’s database. Here, it’s important to note that if the migration command has the REPLACE flag, the restoreCommand function will first call the dbDelete function to delete the existing migration key in the destination node’s database, and then call the dbAdd function to write the migration key. In addition, the restoreCommand function will also set the expiration time for the migration key, as well as LRU or LFU information, and finally return a success message.

The following code shows the final step of the restoreCommand function’s logic:

// If the REPLACE flag is present, delete the existing migration key in the destination node's database
if (replace) dbDelete(c->db, c->argv[1]);

// Write the migration key and value into the destination node's database
dbAdd(c->db, c->argv[1], obj);
if (ttl) {
    if (!absttl) ttl += mstime();
    setExpire(c, c->db, c->argv[1], ttl);
}
objectSetLRUOrLFU(obj, lfu_freq, lru_idle, lru_clock); // Set LRU or LFU information
...
addReply(c, shared.ok); // Return success message

I have also created a diagram to illustrate the basic process of the destination node handling the migration data. Please take a look at it for an overview.

Migration Process

So far, you have understood how the source node sends migration data and how the destination node receives it. Finally, when all the keys in the migration slot have been migrated, we need to execute the CLUSTER SETSLOT command to mark the final result of the migration. Let’s take a look at that next.

Marking the Migration Result #

After the data migration is completed, we need to first execute the CLUSTER SETSLOT command on the destination node to mark the final owner of the migrated slot, as shown below. Then, we need to execute the same command on the source node to mark the final owner of the migrated slot on the source node as well.

CLUSTER SETSLOT <slot> NODE <node>

Since this command is still a CLUSTER command, its processing is still implemented in the clusterCommand function. The option for this command is SETSLOT and it has the NODE flag, so its corresponding code branch looks like this:

void clusterCommand(client *c) {
    ...
    // Handle SETSLOT option
    else if (!strcasecmp(c->argv[1]->ptr, "setslot") && c->argc >= 4) {
        ...
        // Handle NODE flag
        else if (!strcasecmp(c->argv[3]->ptr, "node") && c->argc == 5) { ... }
        ...
    }
    ...
}

In the code branch that handles the NODE flag, the main task is to clear the marks in the migrating_slots_to array and importing_slots_from array on the node.

For the migrating_slots_to array, on the source node, the element corresponding to the migrated slot in this array records the destination node. So when we execute the migration result marking command on the source node, the code branch handling the NODE flag will call the countKeysInSlot function (in the db.c file) to check if there are any keys left in the migration slot. If there are no keys left, the element corresponding to the migrated slot in the migrating_slots_to array will be set to NULL, canceling the migration mark on the source node.

if (countKeysInSlot(slot) == 0 && server.cluster->migrating_slots_to[slot]) {
    server.cluster->migrating_slots_to[slot] = NULL;
}

For the importing_slots_from array, on the destination node, the element corresponding to the migrated slot in this array records the source node. So when we execute the migration result marking command on the destination node, the code branch handling the NODE flag will check if the node in the command’s argument is the destination node itself. If it is, the element corresponding to the migrated slot in the importing_slots_from array will be set to NULL, canceling the import mark on the destination node.

// If the node in the command's argument is the current node and there is an import mark
if (n == myself && server.cluster->importing_slots_from[slot]) {
    ...
    server.cluster->importing_slots_from[slot] = NULL; // Cancel the import mark
}

Finally, the code branch handling the NODE flag will call the clusterDelSlot and clusterAddSlot functions (in the cluster.c file) to update the slots array of the source and destination nodes respectively. You can read the code of these two functions for further understanding.

With that, the entire process of data migration in Redis Cluster is complete.

Summary #

In today’s lesson, I introduced you to the code implementation of Redis Cluster data migration process. You should grasp the following two key points.

The first is the data structure clusterState for recording the cluster state. This structure uses two arrays, migrating_slots_to and importing_slots_from, to record the migration status. It also uses the slots array to record the nodes that each slot belongs to, and the slots_to_keys trie to record the keys in the slots. You need to understand the meanings of these data structures because they are frequently used when reading the cluster source code.

The second is the five major steps of the data migration process. They are:

  • Marking the migrating and importing nodes;
  • Obtaining the keys to be migrated;
  • The source node actually migrates the data;
  • The destination node processes the migrated data;
  • Marking the migration result.

These five steps correspond to different options of the CLUSTER command, the MIGRATE command, and the RESTORE command. Therefore, their implementation logic is mainly in the clusterCommand, migrateCommand, and restoreCommand functions. If you want to learn more details about data migration, you can start by studying these functions.

Finally, I would like to remind you of two key points.

First, when Redis Cluster performs data migration, it calls the syncWrite and syncReadLine functions to synchronously send migration data to the destination node and synchronously read the reply result. This synchronous write and synchronous read process will block the source node from processing requests normally. So when migrating data, you need to control the number and size of the keys being migrated to avoid blocking Redis by migrating too many or too large keys at once.

Second, in practical applications, we use tools such as the redis-cli command-line tool or Ruby-based Redis Cluster cluster operation tool redis-trib to perform data migration. These tools ultimately call the commands introduced in this lesson to actually migrate the data. Therefore, learning the content of today’s lesson is also helpful for you to troubleshoot issues with the redis-cli and redis-trib tools from the code level in practical applications.

One question per lesson #

In the data structure clusterState which maintains the Redis Cluster cluster status, there is a trie called slots_to_keys. It is updated when inserting a key into the database. Can you find the relevant function calls in the Redis source file db.c that update the slots_to_keys trie?