41 How to Design a More Optimized Distributed Lock

41 How to design a more optimized distributed lock #

Hello, I’m Liu Chao.

Starting from this lecture, we officially enter the study of the last module. The content of this comprehensive practical exercise comes from some cases I have personally experienced, and the knowledge points used will be relatively comprehensive. Now it’s time to put into practice what we have learned!

During last year’s Double Eleven shopping festival, our game mall also ran a promotion. At that time, I noticed that the most common exception in the database operation logs was the Interrupted Exception, almost all of which were from a SQL that verifies the idempotence of orders.

Because the verification of order idempotence is the first database operation in the process of submitting an order, it bears a relatively large number of requests. In addition, we implement idempotence verification based on a database table, so there were some cases of request transaction timeouts and interrupted transactions. In fact, implementing idempotence verification based on a database is a way to implement distributed locks.

So what is a distributed lock and what problems does it solve?

In a JVM, in the case of multi-threaded concurrency, we can use synchronized locks or Lock locks to ensure that only one thread can modify shared variables or execute code blocks at the same time. However, now our services are mostly deployed based on a distributed cluster. For some shared resources, such as inventory that we discussed before, using Java locks in a distributed environment becomes ineffective.

In this case, we need to implement distributed locks to ensure the atomicity of shared resources. In addition, distributed locks are also frequently used to avoid repetitive work by different nodes in a distributed system. For example, in a distributed cluster, we only need to ensure that one service node sends a message, and we must avoid multiple nodes sending the same message to the same user.

Because implementing a distributed lock based on a database is relatively simple and easy to understand, and there is no need to introduce third-party middleware, it is the preferred choice for many distributed businesses to implement distributed locks. However, the distributed lock implemented by the database has certain performance bottlenecks.

Next, let’s understand how to use a database to implement a distributed lock, what are the performance bottlenecks, and if there are other implementation methods to optimize distributed locks.

Implementing Distributed Lock with Database #

First, we need to create a lock table to ensure the atomicity of the data by creating and querying data:

CREATE TABLE `order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `order_no` int(11) DEFAULT NULL,
  `pay_money` decimal(10, 2) DEFAULT NULL,
  `status` int(4) DEFAULT NULL,
  `create_date` datetime(0) DEFAULT NULL,
  `delete_flag` int(4) DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `idx_status`(`status`) USING BTREE,
  INDEX `idx_order`(`order_no`) USING BTREE
) ENGINE = InnoDB

Next, if we are checking the idempotence of an order, we need to first query whether the record exists in the database. During the query, phantom reads should be prevented. If the record does not exist, we insert it into the database; otherwise, we abandon the operation.

SELECT id FROM `order` WHERE `order_no` = 'xxxx' FOR UPDATE

Finally, besides preventing phantom reads during queries, we also need to ensure that the query and insert operations are within the same transaction. Therefore, we need to declare the transaction. The specific implementation code is as follows:

@Transactional
public int addOrderRecord(Order order) {
    if (orderDao.selectOrderRecord(order) == null) {
        int result = orderDao.addOrderRecord(order);
        if (result > 0) {
            return 1;
        }
    }
    return 0;
}

With this, we have implemented a distributed lock for checking the idempotence of orders. I believe you can see why this approach can have performance bottlenecks. We mentioned in [Lesson 34] that, in the RR transaction level, the FOR UPDATE operation for SELECT is implemented based on gap locks, which is a pessimistic locking method. Therefore, it has blocking issues.

In high-concurrency scenarios, when a large number of requests come in, most of the requests will be queued and waiting. To ensure the stability of the database, the transaction timeout is often set to a small value, resulting in a large number of interrupted transactions.

In addition to blocking waits, because there is no deletion operation for orders, the data in this lock table will gradually accumulate. We need to set up another thread to periodically delete expired orders from this table, which adds complexity to the business.

Apart from this idempotence-checking distributed lock, there are some other distributed locks implemented purely based on the database that require deleting or modifying data when the lock is released. If after acquiring the lock, the lock is not released, i.e., the data is not deleted or modified, it will result in a deadlock issue.

Implementing Distributed Lock with Zookeeper #

In addition to using a database, we can also implement distributed locks based on Zookeeper. Zookeeper is a centralized service that provides “distributed service coordination”. It is because of the following two features of Zookeeper that distributed applications can implement distributed lock functionality based on it.

Sequential ephemeral nodes: Zookeeper provides a multi-level node namespace (called Znode), where each node is represented by a path separated by slashes ("/"), and each node has a parent node (except for the root node), similar to a file system.

Nodes can be classified as persistent nodes (PERSISTENT) or ephemeral nodes (EPHEMERAL). Each node can also be marked with sequential characteristics (SEQUENTIAL). Once a node is marked with sequential characteristics, the entire node has the characteristic of sequential increment. Generally, we can combine these types of nodes to create the nodes we need. For example, create a persistent node as the parent node and create an ephemeral node under the parent node, marking the ephemeral node as sequential.

Watch mechanism: Zookeeper also provides another important feature, Watcher (event listener). ZooKeeper allows users to register some Watchers on specified nodes, and when certain specific events occur, the ZooKeeper server will notify the users of the events.

After we are familiar with these two features of Zookeeper, let’s take a look at how Zookeeper implements distributed locks.

First, we need to create a parent node with the node type as persistent (PERSISTENT). Whenever we need to access a shared resource, we will create the corresponding sequential child node under the parent node, with the node type as ephemeral (EPHEMERAL) and marked as sequential. The name of the node is composed of the ephemeral node name + parent node name + sequence number.

After creating the child node, we sort all the child nodes whose names start with the ephemeral node name under the parent node, and check if the sequence number of the newly created child node is the smallest. If it is the smallest node, we acquire the lock.

If it is not the smallest node, we block and wait for the lock, and register a watch event for the previous sequential node. We wait for the operation corresponding to the node to acquire the lock.

After calling the shared resource, delete the node, close the ZooKeeper, and trigger the watch event to release the lock.

img

The distributed lock implemented above is a strictly sequential concurrent lock. In general, we can also directly use the Curator framework to implement Zookeeper distributed locks. The code is as follows:

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(maxWait, waitUnit)) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

Zookeeper’s implementation of distributed locks has many advantages compared to database implementation. Zookeeper is implemented as a cluster, which can avoid single point failures, and it can guarantee that each operation can effectively release locks. This is because once the application service crashes, the ephemeral nodes will be automatically deleted due to the disconnection of the session.

Due to the frequent creation and deletion of nodes, along with a large number of watch events, Zookeeper clusters experience significant pressure. Moreover, in terms of performance, there is still a certain gap compared to the distributed lock implementation using Redis, which I will discuss next.

Implementing Distributed Lock using Redis #

Compared to the previous two implementation methods, implementing distributed lock using Redis is the most complex but also offers the best performance.

Most developers use the combination of SETNX and EXPIRE methods to implement distributed lock in Redis. Before version 2.6.12, the specific implementation code is as follows:

public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    Long result = jedis.setnx(lockKey, requestId); // Set the lock
    if (result == 1) { // Lock acquired successfully
        // If the program crashes here before setting the expiration time, a deadlock will occur
        jedis.expire(lockKey, expireTime); // Delete the lock by setting an expiration time
        return true;
    }
    return false;
}

With this approach, the lock is set using the setnx() method. If lockKey already exists, it returns failure; otherwise, it returns success. After successful lock acquisition, in order to release the lock successfully after the synchronized code is executed, the expire() method is also used in the method to set an expiration time for the lockKey value. This ensures that the key is deleted to avoid the situation where the lock cannot be released, preventing the next thread from acquiring the lock, which would result in a deadlock.

If the program crashes before setting the expiration time and after setting the lock, the lockKey will not have an expiration time set, resulting in a deadlock.

Starting from Redis version 2.6.12, SETNX has added an expiration time parameter:

private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";

/**
* Try to acquire distributed lock
* @param jedis Redis client
* @param lockKey Lock
* @param requestId Request identifier
* @param expireTime Expiration time
* @return Whether acquisition is successful
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

We can also use Lua scripts to implement the atomicity of lock setting and expiration time, and then execute the script using the jedis.eval() method:

// Lock script
private static final String SCRIPT_LOCK = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('pexpire', KEYS[1], ARGV[2]) return 1 else return 0 end";
// Unlock script
private static final String SCRIPT_UNLOCK = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

Although the SETNX method ensures the atomicity of lock setting and expiration time, if the expiration time is set to be short and the business execution time is long, there will be a problem where the lock code block becomes invalid. We need to set the expiration time long enough to prevent this problem from occurring.

This solution is currently the optimal solution for distributed locks. However, if it is used in a Redis cluster environment, there are still issues. Since the data synchronization in Redis clusters is asynchronous, if a Master node acquires the lock but crashes before it is synchronized to other nodes, the new Master node can still acquire the lock. Therefore, multiple application services can acquire the lock at the same time.

Redlock Algorithm #

Redisson is an In-Memory Data Grid implemented based on Redis, which was launched by Redis official. It not only provides a series of distributed Java common objects but also provides many distributed services. Redisson is implemented based on the netty communication framework, so it supports non-blocking communication and generally has better performance compared to the familiar Jedis.

Redisson implements Redis distributed lock, supporting both single node mode and cluster mode. In cluster mode, Redisson uses the Redlock algorithm to prevent multiple applications from acquiring the lock when the Master node crashes and switches to another Master. Let’s take a look at the process of acquiring distributed lock by an application service to understand the implementation of the Redlock algorithm:

The lock is acquired using a single instance on different nodes, and each lock acquisition has a timeout. If the request times out, the node is considered not available. When more than half of the Redis nodes successfully acquire the lock for the application service (N/2+1, where N is the number of nodes), and the actual time spent acquiring the lock does not exceed the expiration time of the lock, the lock acquisition is considered successful.

Once the lock is successfully acquired, the release time of the lock will be recalculated, which is the original release time minus the time spent acquiring the lock. If the lock acquisition fails, the client will still release the node where the lock was successfully acquired.

The specific code implementation is as follows:

  1. First, import the jar package:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.8.2</version>
</dependency>
  1. Implement the Redisson configuration file:
@Bean
public RedissonClient redissonClient() {
    Config config = new Config();
    config.useClusterServers()
            .setScanInterval(2000) // Cluster state scan interval time in milliseconds
            .addNodeAddress("redis://127.0.0.1:7000").setPassword("1")
            .addNodeAddress("redis://127.0.0.1:7001").setPassword("1")
            .addNodeAddress("redis://127.0.0.1:7002").setPassword("1");
    return Redisson.create(config);
}
  1. Acquiring the lock:
long waitTimeout = 10;
long leaseTime = 1;
RLock lock1 = redissonClient1.getLock("lock1");
RLock lock2 = redissonClient2.getLock("lock2");
RLock lock3 = redissonClient3.getLock("lock3");

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
// Acquire the locks simultaneously: lock1 lock2 lock3
// Redlock algorithm considers it successful if the locks are acquired on most nodes and the total timeout time and individual node timeout time are set
redLock.trylock(waitTimeout, leaseTime, TimeUnit.SECONDS);
...
redLock.unlock();

Summary #

There are many ways to implement distributed locks, including the simplest database implementation, the Zookeeper multi-node implementation, and the cache implementation. We can perform performance testing on these three implementation methods separately and find that under the same server configuration, Redis has the best performance, followed by Zookeeper, and the database has the worst performance.

In terms of implementation method and reliability, Zookeeper’s implementation method is simple and based on a distributed cluster, which can avoid single point issues and has relatively high reliability. Therefore, in scenarios where the business performance requirements are not particularly high, I recommend using a distributed lock implemented with Zookeeper.

Thought Question #

We know that Redis distributed locks may allow multiple application services to acquire the lock simultaneously in a clustered environment, but the Redlock algorithm implemented in Redisson effectively solves this problem. Does this mean that the distributed lock implemented by Redisson will never allow simultaneous lock acquisition?