当前位置:   article > 正文

Redis分布式锁 - 基于Jedis和LUA的分布式锁

Redis分布式锁 - 基于Jedis和LUA的分布式锁

先基于单机模式,基于Jedis手工造轮子实现自己的分布式锁。

首先看两个命令:

Redis 分布式锁机制,主要借助 setnx expire 两个命令完成。

setnx命令:

setnx  set  if not exists  的简写。将 key 的值设为 value ,当且仅当 key 不存在 ; 若给定的 key 已经存在,则 setnx  不做任何动作。
下面为客户端使用示例:
  1. 127.0.0.1:6379> set lock "unlock"
  2. OK
  3. 127.0.0.1:6379> setnx lock "unlock"
  4. (integer) 0
  5. 127.0.0.1:6379> setnx lock "lock"
  6. (integer) 0
  7. 127.0.0.1:6379>

expire命令:

expire 命令为 key 设置生存时间,当 key 过期时 ( 生存时间为 0 ) ,它会被自动删除。
其格式为: expire key seconds
下面为客户端使用示例:
  1. 127.0.0.1:6379> expire lock 10
  2. (integer) 1
  3. 127.0.0.1:6379> ttl lock
  4. 8

基于Jedis API的分布式锁的总体流程:

通过 Redis setnx expire 命令可以实现简单的锁机制:
  • key不存在时创建,并设置value和过期时间,返回值为1;成功获取到锁;
  • key存在时直接返回0,抢锁失败;
  • 持有锁的线程释放锁时,手动删除key; 或者过期时间到,key自动删除,锁释放。

线程调用setnx方法成功返回1认为加锁成功,其他线程要等到当前线程业务操作完成释放锁后,才能再次调用setnx加锁成功。

以上简单redis分布式锁的问题:

如果出现了这么一个问题:如果 setnx 是成功的,但是 expire 设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个锁永远被占用,其他线程永远也抢不到锁。
所以 , 需要保障 setnx expire 两个操作的原子性,要么全部执行,要么全部不执行,二者不能分开。

解决的办法有两种: 

  • 使用set的命令时,同时设置过期时间,不再单独使用 expire 命令;
  • 使用lua脚本,将加锁的命令放在lua脚本中原子性的执行。

简单加锁:使用set的命令时,同时设置过期时间

使用set的命令时,同时设置过期时间的示例如下:

  1. 127.0.0.1:6379> set unlock "234" EX 100 NX
  2. (nil)
  3. 127.0.0.1:6379>
  4. 127.0.0.1:6379> set test "111" EX 100 NX
  5. OK
这样就完美的解决了分布式锁的原子性; set 命令的完整格式:
set key value [EX seconds] [PX milliseconds] [NX|XX] 
  1. EX seconds:设置失效时长,单位秒
  2. PX milliseconds:设置失效时长,单位毫秒
  3. NX:key不存在时设置value,成功返回OK,失败返回(nil)
  4. XX:key存在时设置value,成功返回OK,失败返回(nil)
加锁的简单代码实现
  1. @Slf4j
  2. @Data
  3. @AllArgsConstructor
  4. public class JedisCommandLock {
  5. private RedisTemplate redisTemplate;
  6. private static final String LOCK_SUCCESS = "OK";
  7. private static final String SET_IF_NOT_EXIST = "NX";
  8. private static final String SET_WITH_EXPIRE_TIME = "PX";
  9. /**
  10. * 尝试获取分布式锁
  11. * @param jedis Redis客户端
  12. * @param lockKey 锁
  13. * @param requestId 请求标识
  14. * @param expireTime 超期时间
  15. * @return 是否获取成功
  16. */
  17. public static boolean tryGetDistributedLock(Jedis jedis, String lockKey,
  18. String requestId, int expireTime) {
  19. String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST,
  20. SET_WITH_EXPIRE_TIME, expireTime);
  21. if (LOCK_SUCCESS.equals(result)) {
  22. return true;
  23. }
  24. return false;
  25. }
  26. }
可以看到,我们加锁用到了 Jedis set Api
jedis.set(String key, String value, String nxxx, String expx, int time)
这个 set() 方法一共有五个形参:
  • 第一个为key,我们使用key来当锁,因为key是唯一的。
  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。 requestId可以使用 UUID.randomUUID().toString() 方法生成。
  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  • 第五个为time,与第四个参数相呼应,代表key的过期时间。

 总的来说,执行上面的set()方法就只会导致两种结果:

1. 当前没有锁( key 不存在),那么就进行加锁操作,并对锁设置个有效期,同时 value 表示加锁的客户端。
2. 已有锁存在,不做任何操作。
心细的童鞋就会发现了,我们的加锁代码满足前面描述的四个条件中的三个。
  • 首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。
  • 其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会被永远占用(而发生死锁)。
  • 最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。
  • 由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。
 基于Jedis API实现简单解锁代码
  1. @Slf4j
  2. @Data
  3. @AllArgsConstructor
  4. public class JedisCommandLock {
  5. private static final Long RELEASE_SUCCESS = 1L;
  6. /**
  7. * 释放分布式锁
  8. * @param jedis Redis客户端
  9. * @param lockKey 锁
  10. * @param requestId 请求标识
  11. * @return 是否释放成功
  12. */
  13. public static boolean releaseDistributedLock(Jedis jedis, String lockKey,
  14. String requestId) {
  15. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
  16. redis.call('del', KEYS[1]) else return 0 end";
  17. Object result = jedis.eval(script, Collections.singletonList(lockKey),
  18. Collections.singletonList(requestId));
  19. if (RELEASE_SUCCESS.equals(result)) {
  20. return true;
  21. }
  22. return false;
  23. }
  24. }
那么这段  Lua  代码的功能是什么呢?
其实很简单,首先获取锁对应的  value  值,检查是否与  requestId  相等,如果相等则删除锁(解锁)。
第一行代码,我们写了一个简单的 Lua 脚本代码。
第二行代码,我们将 Lua 代码传到 jedis.eval() 方法里,并使参数  KEYS[1]  赋值为  lockKey ARGV[1] 赋值为requestId。 eval()  方法是将  Lua  代码交给  Redis  服务端执行。
那么为什么要使用 Lua 语言来实现呢?
因为要确保上述操作是原子性的。那么为什么执行  eval()  方法可以确保原子性,源于  Redis 的特性 . 简单来说,就是在 eval  命令执行  Lua  代码的时候, Lua  代码将被当成一个命令去执行,并且直到 eval 命令执行完成, Redis  才会执行其他命
 错误示例1:
最常见的解锁代码就是直接使用 jedis.del() 方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的。
  1. public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
  2. jedis.del(lockKey);
  3. }
错误示例2 :
这种解锁代码乍一看也是没问题,甚至我之前也差点这样实现,与正确姿势差不多,唯一区别的是分成两条命令去执行,代码如下:
  1. public static void wrongReleaseLock2(Jedis jedis, String lockKey, String
  2. requestId) {
  3. // 判断加锁与解锁是不是同一个客户端
  4. if (requestId.equals(jedis.get(lockKey))) {
  5. // 若在此时,这把锁突然不是这个客户端的,则会误解锁
  6. jedis.del(lockKey);
  7. }
  8. }
基于Lua脚本实现分布式锁
lua脚本的好处

        为什么要使用Lua语言来实现呢? 因为要确保上述操作是原子性的。那么为什么执行 eval()方法可以确保原子性,源于Redis的特性,简单来说,就是在 eval 命令执行 Lua 代码的时候,Lua代码将被当成一个命令去执行,并且直到 eval 命令执行完成,Redis才会执行其他命令。大部分的开源框架(如 redission)中的分布式锁组件,都是用纯lua脚本实现的。Lua 脚本是高并发、高性能的必备脚本语言。

基于纯Lua脚本的分布式锁的执行流程

加锁和删除锁的操作,使用纯 lua 进行封装,保障其执行时候的原子性。
基于纯Lua脚本实现分布式锁的执行流程,大致如下:
 加锁的Lua脚本: lock.lua
  1. --- -1 failed
  2. --- 1 success
  3. ---
  4. local key = KEYS[1]
  5. local requestId = KEYS[2]
  6. local ttl = tonumber(KEYS[3])
  7. local result = redis.call('setnx', key, requestId)
  8. if result == 1 then
  9. --PEXPIRE:以毫秒的形式指定过期时间
  10. redis.call('pexpire', key, ttl)
  11. else
  12. result = -1;
  13. -- 如果value相同,则认为是同一个线程的请求,则认为重入锁
  14. local value = redis.call('get', key)
  15. if (value == requestId) then
  16. result = 1;
  17. redis.call('pexpire', key, ttl)
  18. end
  19. end
  20. -- 如果获取锁成功,则返回 1
  21. return result
解锁的Lua脚本: unlock.lua
  1. --- -1 failed
  2. --- 1 success
  3. -- unlock key
  4. local key = KEYS[1]
  5. local requestId = KEYS[2]
  6. local value = redis.call('get', key)
  7. if value == requestId then
  8. redis.call('del', key);
  9. return 1;
  10. end
  11. return -1
Java中调用lua脚本,完成加锁操作
  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.locks.Lock;
  8. @Slf4j
  9. @Data
  10. @AllArgsConstructor
  11. public class JedisLock implements Lock {
  12. private RedisTemplate redisTemplate;
  13. RedisScript<Long> lockScript = null;
  14. RedisScript<Long> unLockScript = null;
  15. public static final int DEFAULT_TIMEOUT = 2000;
  16. public static final Long LOCKED = Long.valueOf(1);
  17. public static final Long UNLOCKED = Long.valueOf(1);
  18. public static final Long WAIT_GAT = Long.valueOf(200);
  19. public static final int EXPIRE = 2000;
  20. String key;
  21. String lockValue; // lockValue 锁的value ,代表线程的uuid
  22. /**
  23. * 默认为2000ms
  24. */
  25. long expire = 2000L;
  26. public JedisLock(String lockKey, String lockValue) {
  27. this.key = lockKey;
  28. this.lockValue = lockValue;
  29. }
  30. private volatile boolean isLocked = false;
  31. private Thread thread;
  32. /**
  33. * 获取一个分布式锁 , 超时则返回失败
  34. *
  35. * @return 获锁成功 - true | 获锁失败 - false
  36. */
  37. @Override
  38. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  39. //本地可重入
  40. if (isLocked && thread == Thread.currentThread()) {
  41. return true;
  42. }
  43. expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
  44. long startMillis = System.currentTimeMillis();
  45. Long millisToWait = expire;
  46. boolean localLocked = false;
  47. int turn = 1;
  48. while (!localLocked) {
  49. localLocked = this.lockInner(expire);
  50. if (!localLocked) {
  51. millisToWait = millisToWait - (System.currentTimeMillis() -
  52. startMillis);
  53. startMillis = System.currentTimeMillis();
  54. if (millisToWait > 0L) {
  55. /**
  56. * 还没有超时
  57. */
  58. ThreadUtil.sleepMilliSeconds(WAIT_GAT);
  59. log.info("睡眠一下,重新开始,turn:{},剩余时间:{}", turn++,
  60. millisToWait);
  61. } else {
  62. log.info("抢锁超时");
  63. return false;
  64. }
  65. } else {
  66. isLocked = true;
  67. localLocked = true;
  68. }
  69. }
  70. return isLocked;
  71. }
  72. /**
  73. * 有返回值的抢夺锁
  74. *
  75. * @param millisToWait
  76. */
  77. public boolean lockInner(Long millisToWait) {
  78. if (null == key) {
  79. return false;
  80. }
  81. try {
  82. List<String> redisKeys = new ArrayList<>();
  83. redisKeys.add(key);
  84. redisKeys.add(lockValue);
  85. redisKeys.add(String.valueOf(millisToWait));
  86. Long res = (Long) redisTemplate.execute(lockScript, redisKeys);
  87. return res != null && res.equals(LOCKED);
  88. } catch (Exception e) {
  89. e.printStackTrace();
  90. throw BusinessException.builder().errMsg("抢锁失败").build();
  91. }
  92. }
  93. }
Java中调用lua脚本,完成解锁操作
  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. @Slf4j
  7. @Data
  8. @AllArgsConstructor
  9. public class JedisLock implements Lock {
  10. private RedisTemplate redisTemplate;
  11. RedisScript<Long> lockScript = null;
  12. RedisScript<Long> unLockScript = null;
  13. //释放锁
  14. @Override
  15. public void unlock() {
  16. if (key == null || requestId == null) {
  17. return;
  18. }
  19. try {
  20. List<String> redisKeys = new ArrayList<>();
  21. redisKeys.add(key);
  22. redisKeys.add(requestId);
  23. Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. throw BusinessException.builder().errMsg("释放锁失败").build();
  27. }
  28. }
  29. }
 编写RedisLockService用于管理JedisLock
编写个分布式锁服务,用于加载  lua  脚本,创建分布式锁,代码如下:
  1. import com.crazymaker.springcloud.common.util.IOUtil;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.core.script.DefaultRedisScript;
  7. import org.springframework.data.redis.core.script.RedisScript;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.locks.Lock;
  12. @Slf4j
  13. @Data
  14. public class RedisLockService {
  15. private RedisTemplate redisTemplate;
  16. static String lockLua = "script/lock.lua";
  17. static String unLockLua = "script/unlock.lua";
  18. static RedisScript<Long> lockScript = null;
  19. static RedisScript<Long> unLockScript = null;
  20. {
  21. String script =
  22. IOUtil.loadJarFile(RedisLockService.class.getClassLoader(), lockLua);
  23. // String script = FileUtil.readString(lockLua, Charset.forName("UTF-8"));
  24. if (StringUtils.isEmpty(script)) {
  25. log.error("lua load failed:" + lockLua);
  26. }
  27. lockScript = new DefaultRedisScript<>(script, Long.class);
  28. // script = FileUtil.readString(unLockLua, Charset.forName("UTF-8"));
  29. script =
  30. IOUtil.loadJarFile(RedisLockService.class.getClassLoader(), unLockLua);
  31. if (StringUtils.isEmpty(script)) {
  32. log.error("lua load failed:" + unLockLua);
  33. }
  34. unLockScript = new DefaultRedisScript<>(script, Long.class);
  35. }
  36. public RedisLockService(RedisTemplate redisTemplate) {
  37. this.redisTemplate = redisTemplate;
  38. }
  39. public Lock getLock(String lockKey, String lockValue) {
  40. JedisLock lock = new JedisLock(lockKey, lockValue);
  41. lock.setRedisTemplate(redisTemplate);
  42. lock.setLockScript(lockScript);
  43. lock.setUnLockScript(unLockScript);
  44. return lock;
  45. }
  46. }
测试用例
  1. import lombok.extern.slf4j.Slf4j;
  2. import javax.annotation.Resource;
  3. import java.util.UUID;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.TimeUnit;
  8. @Slf4j
  9. @RunWith(SpringRunner.class)
  10. @SpringBootTest(classes = {DemoCloudApplication.class})
  11. // 指定启动类
  12. public class RedisLockTest {
  13. @Resource
  14. RedisLockService redisLockService;
  15. private ExecutorService pool = Executors.newFixedThreadPool(10);
  16. @Test
  17. public void testLock() {
  18. int threads = 10;
  19. final int[] count = {0};
  20. CountDownLatch countDownLatch = new CountDownLatch(threads);
  21. long start = System.currentTimeMillis();
  22. for (int i = 0; i < threads; i++) {
  23. pool.submit(() ->
  24. {
  25. String lockValue = UUID.randomUUID().toString();
  26. try {
  27. Lock lock = redisLockService.getLock("test:lock:1",
  28. lockValue);
  29. boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
  30. if (locked) {
  31. for (int j = 0; j < 1000; j++) {
  32. count[0]++;
  33. }
  34. log.info("count = " + count[0]);
  35. lock.unlock();
  36. } else {
  37. System.out.println("抢锁失败");
  38. }
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. countDownLatch.countDown();
  43. });
  44. }
  45. try {
  46. countDownLatch.await();
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. System.out.println("10个线程每个累加1000为: = " + count[0]);
  51. //输出统计结果
  52. float time = System.currentTimeMillis() - start;
  53. System.out.println("运行的时长为(ms):" + time);
  54. System.out.println("每一次执行的时长为(ms):" + time / count[0]);
  55. }
  56. }

执行结果

  1. 2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7]
  2. c.c.springcloud.lock.RedisLockTest LN:50 count = 6000
  3. 2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1]
  4. c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:
  5. 9585
  6. 2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1]
  7. c.c.springcloud.lock.RedisLockTest LN:50 count = 7000
  8. 2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4]
  9. c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:
  10. 9586
  11. 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5]
  12. c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:
  13. 9585
  14. 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8]
  15. c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:
  16. 9585
  17. 2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4]
  18. c.c.springcloud.lock.RedisLockTest LN:50 count = 8000
  19. 2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8]
  20. c.c.springcloud.lock.RedisLockTest LN:50 count = 9000
  21. 2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5]
  22. c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:4,剩余时间:
  23. 9383
  24. 2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5]
  25. c.c.springcloud.lock.RedisLockTest LN:50 count = 10000
  26. 10个线程每个累加1000为: = 10000
  27. 运行的时长为(ms):827.0
  28. 每一次执行的时长为(ms):0.0827

STW导致的锁过期问题

下面有一个简单的使用锁的例子,在 10 秒内占着锁:
  1. //写数据到文件
  2. public void writeData(filename,data){
  3. boolean locked=lock.tryLock(10,TimeUnit.SECONDS);
  4. if(!locked){
  5. throw'Failed to acquire lock';
  6. }
  7. try{
  8. //将数据写到文件
  9. var file=storage.readFile(filename);
  10. var updated=updateContents(file,data);
  11. storage.writeFile(filename,updated);
  12. }finally{
  13. lock.unlock();
  14. }
  15. }
问题是:如果在写文件过程中,发生了 fullGC ,并且其时间跨度较长, 超过了 10 秒, 那么,分布式锁就自动释放了。
在此过程中, client2 抢到锁,写了文件。
client1 fullGC 完成后,也继续写文件, 注意,此时 client1 的并没有占用锁,此时写入会导致文件数 据错乱,发生线程安全问题,这就是STW导致的锁过期问题。

 

STW导致的锁过期问题,大概的解决方案 

 1: 模拟CAS乐观锁的方式,增加版本号(如下图中的token)
2watch dog自动延期机制 
客户端 1 加锁的锁 key 默认生存时间才 30 秒,如果超过了 30 秒,客户端 1 还想一直持有这把锁,怎么办?
简单!只要客户端 1 一旦加锁成功,就会启动一个 watch dog 看门狗, 他是一个后台线程,会每隔 10 秒检查一下,如果客户端 1 还持有锁 key ,那么就会不断的延长锁 key 的生存时间。

 redission,采用的就是这种方案, 此方案不会入侵业务代码

注意:
单机版的  watch dog 并不能解决 STW  的过期问题, 需要分布式版本的 watch dog , 独立的看门狗服务。
锁删除之后, 取消看门狗服务的对应的 key 记录, 当然,这就使得系统变得复杂, 还要保证看门狗服务的高并发、高可用、数据一致性的问题。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/502683
推荐阅读
相关标签
  

闽ICP备14008679号