14 Atomicity Guarantees From a Distributed Lock Implementation Perspective

14 Atomicity Guarantees from a Distributed Lock Implementation Perspective #

Distributed lock is an important application of Redis in real-world scenarios. When multiple clients concurrently access a shared resource, such as modifying a record in a database, we can use distributed locks from Redis to avoid conflicts. With distributed locks, only the client that acquires the lock can operate on the shared resource.

For distributed locks, the key to its implementation is ensuring that the lock acquisition and release operations are atomic. Only by doing so can we guarantee the correctness of the lock when multiple clients access it. From the previous lessons, you have learned that Redis can capture multiple client-readable events, which are command requests, through its event-driven framework. Additionally, starting from Redis version 6.0, multiple I/O threads are used to read from or write back data concurrently.

Given this, you may wonder: Can the atomicity of distributed locks still be guaranteed?

In today’s lesson, I will walk you through the execution process of a command in Redis server. Then, with the requirements of distributed locks in mind, we will examine how the atomicity of command execution is ensured. At the same time, we will also explore whether the atomicity of distributed locks is affected by the presence of I/O multiplexing and multiple I/O threads.

By doing so, you will not only understand how a single client’s command is executed and how its atomicity is guaranteed, but also be able to apply the knowledge you have learned so far. Understanding the execution process of client commands is also very helpful for troubleshooting Redis issues. You can insert checkpoints during command execution for analysis and debugging.

Alright, now let’s first understand the implementation methods of distributed locks, so that we can identify the corresponding commands used for implementing distributed locks and perform further analysis.

Implementation Methods of Distributed Locks #

In the first season of our course, we learned about the implementation of distributed locks. You can review it again. Here, I will briefly introduce the commands for acquiring and releasing locks in a distributed lock.

Firstly, for the acquire lock operation of distributed locks, we can use the SET command in Redis. The Redis SET command provides two options, NX and EX, which mean:

  • NX, which means if the key does not exist, Redis will create it directly; if the key already exists, it will return NULL without modifying the key.
  • EX, which means setting the expiration time for the key.

Therefore, we can let the client send the following command to acquire the lock. The lockKey is the name of the lock, uid is the ID used by the client to uniquely identify itself, and expireTime is the expiration time of the lock represented by this key. When this expiration time is reached, the key will be deleted and the lock will be released, thereby avoiding the problem of the lock being unable to be released.

SET lockKey uid EX expireTime NX

If no client has created the lock yet, let’s say the client A sends this SET command to Redis as shown below:

SET stockLock 1033 EX 30 NX

In this way, Redis will create the corresponding key, which is stockLock, and the value of the key-value pair is the ID of this client, 1033. At this time, let’s assume that another client B also sends a SET command as shown below, indicating that it wants to change the value of the key-value pair with the key stockLock to the ID of client B, 2033, which means acquiring the lock.

SET stockLock 2033 EX 30 NX

Since the NX option is used, if the key stockLock already exists, client B will not be able to modify it and therefore cannot acquire the lock. Thus, the locking effect is achieved.

As for releasing the lock, we can use the following Lua script, which will be executed on the Redis server in the form of the EVAL command. The client will use the GET command to read the value of the key corresponding to the lock and check whether the value is equal to its own ID. If it is equal, it means that the current client holds the lock and can execute the DEL command to delete the key and release the lock. If the value is not equal to the client’s own ID, the script will simply return.

if redis.call("get", lockKey) == uid then
   return redis.call("del", lockKey)
else
   return 0
end

In this way, the client will not mistakenly delete a lock obtained by another client, ensuring the security of the lock.

Now, we understand the commands for implementing distributed locks. So here, the question we need to clarify is: whether the SET command for acquiring locks, the Lua script and the EVAL command for releasing locks will be executed simultaneously with IO multiplexing, or when using multiple IO threads, will they be executed concurrently?

This is related to the execution process of commands in Redis. Next, let’s understand how a command is executed in Redis. At the same time, we will learn whether multiple concurrent clients introduced by IO multiplexing and multiple IO threads will disrupt the atomicity of commands.

Processing of a Command #

Now that we know that once a Redis server establishes a connection with a client, it registers a readable event in the event-driven framework, which corresponds to the client’s command request. Regarding the entire process of command processing, I believe it can be divided into four stages, each of which corresponds to different functions in the Redis source code. Here, I list their corresponding entry functions, i.e., the functions from which they start execution:

  • Command Reading: corresponds to the function readQueryFromClient;
  • Command Parsing: corresponds to the function processInputBufferAndReplicate;
  • Command Execution: corresponds to the function processCommand;
  • Result Return: corresponds to the function addReply;

Now let’s take a look at the basic flow of these four entry functions one by one, as well as their main invocation relationships within them.

Command Reading Stage: readQueryFromClient Function #

First, let’s understand the basic flow of the readQueryFromClient function.

The readQueryFromClient function reads data from the socket of the client connection, up to a maximum length of readlen. The value of readlen is defined as the macro PROTO_IOBUF_LEN. This macro is defined in the server.h file with a default value of 16KB.

After that, the readQueryFromClient function performs some exception handling based on the status of the data read, such as data read failure or client connection closure. In addition, if the current client is the master node in the master-slave replication, the read data will be appended to the buffer used for command synchronization between the master and slave nodes.

Finally, the readQueryFromClient function calls the processInputBufferAndReplicate function, which enters the next stage of command processing, the command parsing stage.

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
   ...
   readlen = PROTO_IOBUF_LEN;  // Maximum length of data read from the client socket, default is 16KB
   ...
   c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);  // Allocate space for the buffer
   nread = read(fd, c->querybuf+qblen, readlen);  // Call read to read data from the client socket with the file descriptor fd
   ...
   processInputBufferAndReplicate(c);  // Call processInputBufferAndReplicate to further process the read content
}

I have drawn a graph below to show the basic flow of the readQueryFromClient function. You can take a look.

Command Parsing Stage: processInputBufferAndReplicate Function #

The processInputBufferAndReplicate function (in the networking.c file) executes two branches based on whether the current client has the CLIENT_MASTER flag.

  • Branch One

This branch corresponds to a client without the CLIENT_MASTER flag, meaning the current client does not belong to the master node in master-slave replication. In this case, the processInputBufferAndReplicate function directly calls the processInputBuffer function (in the networking.c file) to parse the commands and arguments in the client input buffer. So, in this case, the function that actually performs command parsing is the processInputBuffer function. Let’s take a closer look at this function later.

  • Branch Two

This branch corresponds to a client with the CLIENT_MASTER flag, meaning the current client belongs to the master node in master-slave replication. In this case, in addition to calling the processInputBuffer function to parse the client commands, the processInputBufferAndReplicate function also calls the replicationFeedSlavesFromMasterStream function (in the replication.c file) to synchronize the commands received by the master node to the slave nodes.

The following graph shows the basic execution logic of the processInputBufferAndReplicate function. Take a look.

Okay, just now we learned that the command parsing is actually performed in the processInputBuffer function. So now let’s understand the basic flow of this function.

First, the processInputBuffer function executes a while loop, continuously reading data from the client’s input buffer. Then, it checks the format of the command read to see if it starts with a “*”.

If the command starts with “*”, it indicates that this command is a PROTO_REQ_MULTIBULK type command request, which conforms to the RESP protocol (Redis client-server standard communication protocol). In this case, the processInputBuffer function will further call the processMultibulkBuffer function (in the networking.c file) to parse the command that was read.

If the command does not start with “*”, it means that this command is a PROTO_REQ_INLINE type command request and is not a RESP protocol request. These types of commands are also known as pipeline commands, where commands are separated by newline characters “\r\n”. For example, the commands we send to Redis using Telnet are of the PROTO_REQ_INLINE type. In this case, the processInputBuffer function will call the processInlineBuffer function (in the networking.c file) to actually parse the command.

Once the command parsing is completed, the processInputBuffer function will call the processCommand function to enter the third phase of command processing, which is the command execution phase.

The following code shows the main flow of the processInputBuffer function during command parsing. You can take a look:

void processInputBuffer(client *c) {
   while(c->qb_pos < sdslen(c->querybuf)) {
      ...
       if (!c->reqtype) {
            // Determine the command type based on the first character of the client input buffer
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK; // Command conforming to RESP protocol
            } else {
                c->reqtype = PROTO_REQ_INLINE; // Pipeline command
            }
        }
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;  // For pipeline commands, call the processInlineBuffer function to parse
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break; // For RESP protocol commands, call the processMultibulkBuffer function to parse
        }
        ... 
       if (c->argc == 0) {
            resetClient(c);
        } else {
            // Call the processCommand function to start executing the command
            if (processCommand(c) == C_OK) {
               ...   } 
            ... }
        }
        ...
}

The diagram below shows the basic execution flow of the processInputBuffer function, you can review it again.

Alright, now let’s continue to look at the third phase, which is the basic processing flow of the processCommand function during command execution.

Command Execution Phase: processCommand Function #

First of all, we need to know that the processCommand function is implemented in the server.c file. Its main logic before actually executing the command can be divided into three steps:

  • In the first step, the processCommand function calls the moduleCallCommandFilters function (in the module.c file) to replace Redis commands with the commands that modules want to replace.
  • In the second step, the processCommand function checks if the current command is a quit command and handles it accordingly.
  • In the third step, the processCommand function calls the lookupCommand function to search for the relevant command in the commands member variable of the global variable server.

Here, you need to pay attention to the member variable commands of the global variable server. It is a hash table and it is defined in the redisServer structure in the server.h file, as shown below:

struct redisServer {
   ...
   dict *commands; 
   ...
}

In addition, the member variable commands is initialized in the initServerConfig function by calling the dictCreate function to create the hash table, and then by calling the populateCommandTable function to insert the Redis command names and their corresponding implementation functions into the hash table.

void initServerConfig(void) {
...
server.commands = dictCreate(&commandTableDictType,NULL);
...
populateCommandTable();
...
}

The populateCommandTable function uses the redisCommandTable array of redisCommand structures.

The redisCommandTable array is defined in the server.c file, and each element is a redisCommand structure representing a Redis command. In other words, the redisCommand structure records the implementation function for the current command.

For example, the following code shows the information for the GET and SET commands, with their respective implementation functions getCommand and setCommand. If you want to further understand the redisCommand structure, you can also look at its definition in the server.h file.

struct redisCommand redisCommandTable[] = {
    ...
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    ...
}

So far, you have learned that the lookupCommand function will search for the corresponding command in the commands hash table based on the parsed command name.

Once the command is found, the processCommand function will perform various checks, such as validating the command’s arguments, verifying the authenticated user who sent the command, and checking the current memory usage, etc. This part of the processing logic is quite extensive, and you can further read the processCommand function to understand it.

After finishing all the checks on the command, the processCommand function will start executing the command. It will check if the current client has the CLIENT_MULTI flag. If it does, it means that the command to be processed is related to Redis transactions, so it will call the queueMultiCommand function to enqueue and save the command for later processing.

If not, the processCommand function will call the call function to actually execute the command. The following code snippet shows this part of the logic:

// If the client has CLIENT_MULTI flag and the current command is not exec, discard, multi, or watch
if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);  // Enqueue and save the command for later processing
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL);  // Call the function to execute the command
        ...

}

Here, you need to know that the call function is implemented in the server.c file. It executes the command by calling the command itself through a function pointer defined in the redisCommand structure. Just like I mentioned earlier, each redisCommand structure defines its corresponding implementation function, which can be found in the redisCommandTable array.

Since the lock operation of distributed locks is implemented using the SET command, I will use the SET command as an example to introduce its actual execution process.

The implementation function of the SET command is setCommand, which is defined in the t_string.c file. The setCommand function first checks the command parameters, such as whether there are options like NX, EX, XX, PX, etc. If there are, the setCommand function records these flags.

Then, the setCommand function calls the setGenericCommand function, which is also implemented in the t_string.c file. The setGenericCommand function processes the command parameters based on the flags recorded by the setCommand function. For example, if the command parameters contain the NX option, the setGenericCommand function calls the lookupKeyWrite function (defined in the db.c file) to check whether the key to execute the SET command already exists.

If the key already exists and the command parameters contain the NX option, the setGenericCommand function calls the addReply function to return a NULL value, which is in line with the semantics of distributed locks.

The following code shows this execution logic:

// If NX option is set, check if the key already exists
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);  // If the key already exists, return NULL value
        return;
    }

Alright, if the SET command can be executed normally, that is, if the command contains the NX option but the key does not exist, or if the command contains the XX option but the key already exists, the setGenericCommand function calls the setKey function (defined in the db.c file) to actually insert the key-value pair, as shown below:

setKey(c->db,key,val);

Then, if the command sets an expiration time, the setGenericCommand function calls the setExpire function to set the expiration time. Finally, the setGenericCommand function calls the addReply function to return the result to the client, as shown below:

addReply(c, ok_reply ? ok_reply : shared.ok);

Okay, with these steps, the execution of the SET command is complete. You can also take a look at the basic flowchart below.

Flowchart

Furthermore, you can see that whether during the command execution process, the execution conditions of the command are not met or the command is successfully executed, the addReply function is always called to return the result. So this enters the last stage of the command processing process: the result return stage.

Result Return Stage: addReply Function #

The addReply function is defined in the networking.c file. Its execution logic is relatively simple. It mainly calls the prepareClientToWrite function, and in the prepareClientToWrite function, it calls the clientInstallWriteHandler function to add the client to be written back to the global variable clients_pending_write list in the server.

Then, the addReply function calls functions like _addReplyToBuffer (defined in networking.c) to add the result to the client’s output buffer.

Now you understand how a command is read, parsed, executed, and finally returned to the client. The following diagram shows this process and the main functions involved. You can review it again.

Command Execution Process

However, besides this, you also need to pay attention to the fact that if the IO main thread handles the entire command processing process, then the atomicity of the command execution is guaranteed, and the atomicity of distributed locks is also guaranteed accordingly.

But if this processing process is combined with the IO multiplexing mechanism and multi-IO thread mechanism we introduced earlier, at which stage do these two mechanisms come into play, and will they affect the atomicity of command execution?

Next, let’s take a look at the impact of these mechanisms on the guarantee of atomicity.

The impact of IO multiplexing on command atomicity #

Firstly, you need to understand that the IO multiplexing mechanism comes into play before the readQueryFromClient function is executed. It is actually invoked within the event-driven framework to call the aeApiPoll function and obtain a batch of ready socket descriptors. Then, a loop is executed to trigger the readQueryFromClient function for each ready descriptor.

In this way, even if the IO multiplexing mechanism acquires multiple ready socket descriptors simultaneously, the main thread of Redis still processes them one by one by calling the callback function. Moreover, for write events, the IO multiplexing mechanism also handles them one by one.

The following code demonstrates the logic of acquiring a batch of events through the aeApiPoll function and processing them one by one. You can take a look.

numevents = aeApiPoll(eventLoop, tvp);

for (j = 0; j < numevents; j++) {
   aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
   if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
   }
}

Therefore, even when using IO multiplexing, the entire command processing can still be completed by the main IO thread and the atomicity of command execution can still be guaranteed. The following diagram illustrates the relationship between IO multiplexing and the command processing. You can take a look.

IO Multiplexing and Command Processing

Next, let’s take a look at the impact of multiple IO threads on command atomicity.

Impact of Multiple IO Threads on Command Atomicity #

We know that multiple IO threads can perform read or write operations. For read operations, the readQueryFromClient function, during its execution, calls postponeClient to add the client waiting for read to the clients_pending_read list. You can review this process in Lesson 13.

Then, the client waiting for read is assigned to multiple IO threads for execution. Each IO thread executes the readQueryFromClient function, which reads the command and further calls the processInputBuffer function to parse the command. This basic process is the same as the code before Redis 6.0.

However, compared to the previous code in Redis 6.0, the processInputBuffer function in Redis 6.0 adds a new condition. If the client flag contains CLIENT_PENDING_READ, after parsing the command, the processInputBuffer function only changes the client flag to CLIENT_PENDING_COMMAND and exits the command parsing loop.

At this point, the processInputBuffer function has only parsed the first command and does not actually call the processCommand function to execute the command. Here is an example:

void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
    ...
   if (c->argc == 0) {
            resetClient(c);
        } else {
            // If the client flag contains CLIENT_PENDING_READ, change it to CLIENT_PENDING_COMMAND,
            // then exit the loop without calling the processCommandAndResetClient function to execute the command
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
   }
}

Therefore, when all IO threads have finished parsing the first command, the handleClientsWithPendingReadsUsingThreads function executed in the main IO thread will call the processCommandAndResetClient function to execute the command and the processInputBuffer function to parse the remaining commands. You can review this part in Lesson 13.

So now, you can see that even when using multiple IO threads, the command execution phase is still performed by the main IO thread. Thus, the atomicity of command execution can still be guaranteed, and the atomicity of distributed locks can also be guaranteed.

Let’s take a look at the flow of writing back data.

In this phase, the addReply function defers the client’s write-back operation. At this point, the Redis command has already finished executing. So even if multiple IO threads are writing back client data simultaneously, it only returns the result to the client and does not affect the execution result of the command in the Redis server. In other words, even when using multiple IO threads for writing back, Redis can still ensure the atomicity of command execution.

The following diagram shows the threads responsible for each phase of the command processing when using multiple IO threads. You can review it again.

Summary #

In today’s class, I mainly combined the atomicity guarantee requirement of distributed locks to teach you the entire process of how Redis handles a command. In particular, you need to focus on the methods for implementing distributed locks.

As we know, the lock and unlock operations can be realized using the SET command and Lua scripts with the EVAL command, respectively. Therefore, the atomicity guarantee of distributed locks mainly relies on the atomicity guarantee of the SET and EVAL commands when executed in the Redis server.

Next, I also analyzed the entire process of command processing in Redis in detail. I divided this process into four stages: command reading, command parsing, command execution, and result returning. Therefore, you also need to understand the main flow of functions executed in these four stages.

Before Redis 6.0, these four stages were executed by the main IO thread. Although Redis uses IO multiplexing mechanism, this mechanism only obtains multiple ready socket descriptors at once, corresponding to multiple clients sending command requests. In the main IO thread, Redis still handles commands one by one for each client, so the atomicity of command execution can still be guaranteed.

After Redis 6.0, the reading, parsing, and result writing stages in the command processing process are handled by multiple IO threads. However, you don’t need to worry. The multiple IO threads only handle parsing the first received command, and the actual execution of the command is still handled by the main IO thread. When multiple IO threads concurrently write back results, the command has already been executed, so there is no conflict between multiple IO threads. Therefore, even with multiple IO threads, the atomicity of command execution can still be guaranteed.

Finally, I would like to share my observations on multiple IO threads. From the content introduced in today’s class, you can see that multiple IO threads do not actually speed up the execution of commands. Instead, they only parallelize the reading, parsing, and result writing of commands, and the reading and parsing of commands are still only for the first command received. In fact, this design consideration is mainly because network IO needs to be accelerated. So, if the execution of commands itself becomes a bottleneck in Redis runtime, you can consider using Redis sharded clusters to improve processing efficiency.

One Question per Lesson #

If the command execution in the command processing process is also handed over to multiple IO threads for execution, what other benefits or negative effects do you think there may be besides the impact on atomicity?

Feel free to share your answers and insights in the comments. If you find it helpful, feel free to share today’s content with more friends.