当前位置:   article > 正文

小王,在 Java 中如何利用 redis 实现一个分布式锁服务呢???

java redis 怎么实现分布式锁?

作者:杨高超

juejin.im/post/5a4984af6fb9a0450b66bc57

在现代的编程语言中,接触过多线程编程的程序员多多少少对锁有一定的了解。简单的说,多线程中的锁就是在多线程环境下,多个线程对共享资源进行修改的时候,保证共享资源一致性的机制。这里不展开说。在分布式环境下,原来的多线程的锁就不管用了,也就出现了分布式锁的需求。所谓分布式锁服务也就是在分布式环境下,保证多个分布式的服务共享的资源一致性的服务。

在分布式环境下实现一个分布式锁服务并不太容易,需要考虑很多在单进程下的锁服务不需要考虑的问题。分布式锁锁的实现也有很多。这里我们讨论在 Java 中通过 redis 来实现。

在 GitHub 中的 redisson 项目中已经有开源的实现。但是那个太复杂了。现在我们来基于单机的 redis 实现一个简单的分布式锁服务。这个服务必须满足下面的要求

  • 支持立即获取锁方式,如果获取到返回true,获取不到则返回false;

  • 支持等待获取锁方式,如果获取到,直接返回true,获取不到在等待一小段时间,在这一小段时间内反复尝试,如果尝试成功,则返回true,等待时间过后还获取不到则返回false;

  • 不能产生死锁的情况;

  • 不能释放非自己加的锁;

下面我们用实例来演示在 Java 中利用 redis 实现分布式锁服务

加锁

通过 redis 来实现分布式锁的加锁逻辑如下所示:

根据这个逻辑,实现上锁的核心代码如下所示:

  1. jedis.select(dbIndex);
  2. String key = KEY_PRE + key;
  3. String value = fetchLockValue();
  4. if(jedis.exists(key)){
  5.   jedis.set(key,value);
  6.   jedis.expire(key,lockExpirseTime);
  7.   return value;
  8. }

表面上看这段代码好像没有什么问题,实际上并不能在分布式环境中正确的实现加锁的操作。要能够正确的实现加锁操作,“判断 key 是否存在”、“保存 key-value”、“设置 key 的过期时间”这三步操作必须是原子操作。如果不是原子操作,那么可能会出现下面两种情况:

  • “判断 key 是否存在”得出 key 不存在的结果步骤后,“保存 key-value”步骤前,另一个客户端执行同样的逻辑,并且执行到了“判断 key 是否存在”步骤,同样得出了 key 不存在的结果。这样回导致多个客户端获得了同一把锁;

  • 在客户端执行完“保存 key-value” 步骤后,需要设置一个 key 的过期时间,防止客户端因为代码质量未解锁,在或者进程崩溃未解锁导致的死锁情况。在“保存 key-value”步骤之后,“设置 key 的过期时间”步骤之前,可能进程崩溃,导致“设置 key 的过期时间”步骤失败;

redis 在2.6.12版本之后,对 set 命令进行了扩充,能够规避上面的两个问题。新版的 redis set 命令的参数如下

SET key value [EX seconds] [PX milliseconds] [NX|XX]

新版的 set 命令增加了 EX 、 PX 、 NX|XX 参数选项。他们的含义如下

  • EX seconds – 设置键 key 的过期时间,单位时秒

  • PX milliseconds – 设置键 key 的过期时间,单位时毫秒

  • NX – 只有键 key 不存在的时候才会设置 key 的值

  • XX – 只有键 key 存在的时候才会设置 key 的值

这样,原来的三步操作就可以在一个 set 的原子操作里面来完成,规避了上面我们提到的两个问题。

新版的 redis 加锁核心代码修改如下所示:

  1. jedis = redisConnection.getJedis();
  2. jedis.select(dbIndex);
  3. String key = KEY_PRE + key;
  4. String value = fetchLockValue();
  5. if ("OK".equals(jedis.set(key, value, "NX""EX", lockExpirseTime))) {
  6.     return value;
  7. }

解锁

解锁的基本流程如下:

根据这个逻辑,在 Java 中解锁的核心代码如下所示:

  1. jedis.select(dbIndex);
  2. String key = KEY_PRE + key;
  3. if(jedis.exists(key) && value.equals(jedis.get(key))){
  4.     jedis.del(key);
  5.     return true;
  6. }
  7. return false;

和加锁的时候一样,key 是否存在、判断是否自己持有锁、**删除 key-value **这三步操作需要是原子操作,否则当一个客户端执行完“判断是否自己持有锁”步骤后,得出自己持有锁的结论,此时锁的过期时间到了,自动被 redis 释放了,同时另一个客户端又基于这个 key 加锁成功,如果第一个客户端还继续执行删除 key-value的操作,就将不属于自己的锁给释放了。

这显然是不运行的。在这里我们利用 redis 执行 Lua 脚本的能力来解决原子操作的问题。修改后的解锁核心代码如下所示:

  1. jedis.select(dbIndex);
  2. String key = KEY_PRE + key;
  3. String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
  4. if (1L.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
  5.     return true;
  6. }

另外,判断是否自己持有锁的机制是用加锁的时候的 key-value 来判断当前的 key 的值是否等于自己持有锁时获得的值。所以加锁的时候的 value 必须是一个全局唯一的字符串。扩展:分布式全局唯一ID生成策略

完整的代码如下所示

  1. package com.x9710.common.redis.impl;
  2. import com.x9710.common.redis.LockService;
  3. import com.x9710.common.redis.RedisConnection;
  4. import org.apache.commons.logging.Log;
  5. import org.apache.commons.logging.LogFactory;
  6. import redis.clients.jedis.Jedis;
  7. import java.text.DateFormat;
  8. import java.text.SimpleDateFormat;
  9. import java.util.Collections;
  10. import java.util.Date;
  11. import java.util.UUID;
  12. /**
  13.  * 分布式锁 redis 实现
  14.  *
  15.  * @author 杨高超
  16.  * @since 2017-12-14
  17.  */
  18. public class LockServiceRedisImpl implements LockService {
  19. private static Log log = LogFactory.getLog(LockServiceRedisImpl.class);
  20. private static String SET_SUCCESS = "OK";
  21. private static String KEY_PRE = "REDIS_LOCK_";
  22. private DateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS");
  23. private RedisConnection redisConnection;
  24. private Integer dbIndex;
  25. private Integer lockExpirseTime;
  26. private Integer tryExpirseTime;
  27. public void setRedisConnection(RedisConnection redisConnection) {
  28.     this.redisConnection = redisConnection;
  29. }
  30. public void setDbIndex(Integer dbIndex) {
  31.     this.dbIndex = dbIndex;
  32. }
  33. public void setLockExpirseTime(Integer lockExpirseTime) {
  34.     this.lockExpirseTime = lockExpirseTime;
  35. }
  36. public void setTryExpirseTime(Integer tryExpirseTime) {
  37.     this.tryExpirseTime = tryExpirseTime;
  38. }
  39. public String lock(String key) {
  40.     Jedis jedis = null;
  41.     try {
  42.         jedis = redisConnection.getJedis();
  43.         jedis.select(dbIndex);
  44.         key = KEY_PRE + key;
  45.         String value = fetchLockValue();
  46.         if (SET_SUCCESS.equals(jedis.set(key, value, "NX""EX", lockExpirseTime))) {
  47.             log.debug("Reids Lock key : " + key + ",value : " + value);
  48.             return value;
  49.         }
  50.     } catch (Exception e) {
  51.         e.printStackTrace();
  52.     } finally {
  53.         if (jedis != null) {
  54.             jedis.close();
  55.         }
  56.     }
  57.     return null;
  58. }
  59. public String tryLock(String key) {
  60.     Jedis jedis = null;
  61.     try {
  62.         jedis = redisConnection.getJedis();
  63.         jedis.select(dbIndex);
  64.         key = KEY_PRE + key;
  65.         String value = fetchLockValue();
  66.         Long firstTryTime = new Date().getTime();
  67.         do {
  68.             if (SET_SUCCESS.equals(jedis.set(key, value, "NX""EX", lockExpirseTime))) {
  69.                 log.debug("Reids Lock key : " + key + ",value : " + value);
  70.                 return value;
  71.             }
  72.             log.info("Redis lock failure,waiting try next");
  73.             try {
  74.                 Thread.sleep(100);
  75.             } catch (InterruptedException e) {
  76.                 e.printStackTrace();
  77.             }
  78.         } while ((new Date().getTime() - tryExpirseTime * 1000) < firstTryTime);
  79.     } catch (Exception e) {
  80.         e.printStackTrace();
  81.     } finally {
  82.         if (jedis != null) {
  83.             jedis.close();
  84.         }
  85.     }
  86.     return null;
  87. }
  88. public boolean unLock(String key, String value) {
  89.     Long RELEASE_SUCCESS = 1L;
  90.     Jedis jedis = null;
  91.     try {
  92.         jedis = redisConnection.getJedis();
  93.         jedis.select(dbIndex);
  94.         key = KEY_PRE + key;
  95.         String command = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
  96.         if (RELEASE_SUCCESS.equals(jedis.eval(command, Collections.singletonList(key), Collections.singletonList(value)))) {
  97.             return true;
  98.         }
  99.     } catch (Exception e) {
  100.         e.printStackTrace();
  101.     } finally {
  102.         if (jedis != null) {
  103.             jedis.close();
  104.         }
  105.     }
  106.     return false;
  107. }
  108. /**
  109.  * 生成加锁的唯一字符串
  110.  *
  111.  * @return 唯一字符串
  112.  */
  113. private String fetchLockValue() {
  114.     return UUID.randomUUID().toString() + "_" + df.format(new Date());
  115. }
  116. }

测试代码

  1. package com.x9710.common.redis.test;
  2. import com.x9710.common.redis.RedisConnection;
  3. import com.x9710.common.redis.impl.LockServiceRedisImpl;
  4. public class RedisLockTest {
  5. public static void main(String[] args) {
  6.     for (int i = 0; i < 9; i++) {
  7.         new Thread(new Runnable() {
  8.             public void run() {
  9.                 RedisConnection redisConnection = RedisConnectionUtil.create();
  10.                 LockServiceRedisImpl lockServiceRedis = new LockServiceRedisImpl();
  11.                 lockServiceRedis.setRedisConnection(redisConnection);
  12.                 lockServiceRedis.setDbIndex(15);
  13.                 lockServiceRedis.setLockExpirseTime(20);
  14.                 String key = "20171228";
  15.                 String value = lockServiceRedis.lock(key);
  16.                 try {
  17.                     if (value != null) {
  18.                         System.out.println(Thread.currentThread().getName() + " lock key = " + key + " success! ");
  19.                         Thread.sleep(25 * 1000);
  20.                     }else{
  21.                         System.out.println(Thread.currentThread().getName() + " lock key = " + key + " failure! ");
  22.                     }
  23.                 } catch (Exception e) {
  24.                     e.printStackTrace();
  25.                 } finally {
  26.                     if (value == null) {
  27.                         value = "";
  28.                     }
  29.                     System.out.println(Thread.currentThread().getName() + " unlock key = " + key + " " + lockServiceRedis.unLock(key, value));
  30.                 }
  31.             }
  32.         }).start();
  33.     }
  34. }
  35. }

测试结果

  1. Thread-1 lock key = 20171228 failure! 
  2. Thread-2 lock key = 20171228 failure! 
  3. Thread-4 lock key = 20171228 failure! 
  4. Thread-8 lock key = 20171228 failure! 
  5. Thread-7 lock key = 20171228 failure! 
  6. Thread-3 lock key = 20171228 failure! 
  7. Thread-5 lock key = 20171228 failure! 
  8. Thread-0 lock key = 20171228 failure! 
  9. Thread-6 lock key = 20171228 success! 
  10. Thread-1 unlock key = 20171228 false
  11. Thread-2 unlock key = 20171228 false
  12. Thread-4 unlock key = 20171228 false
  13. Thread-8 unlock key = 20171228 false
  14. Thread-3 unlock key = 20171228 false
  15. Thread-5 unlock key = 20171228 false
  16. Thread-0 unlock key = 20171228 false
  17. Thread-7 unlock key = 20171228 false
  18. Thread-6 unlock key = 20171228 true

从测试结果来看可以看到,9个线程同时给一个 key 加锁,只有一个能够成功获取到锁,其余的客户端都无法获取到锁。

后记

这个代码里面还实现了一个 tryLock 的接口。这个主要是客户端无法获取到锁的时候会在一小段时间内反复尝试是否能够获取到锁。

Redis其他文章扩展:

从入门到入土,Redis简明教程

来讨论一下这些常见的 Redis 面试题

Springboot + redis + 注解 + 拦截器来实现接口幂等性校验

文中代码

https://github.com/gaochao2000/redis_util

END

Java面试题专栏

【61期】MySQL行锁和表锁的含义及区别(MySQL面试第四弹)

【62期】解释一下MySQL中内连接,外连接等的区别(MySQL面试第五弹)

【63期】谈谈MySQL 索引,B+树原理,以及建索引的几大原则(MySQL面试第六弹)

【64期】MySQL 服务占用cpu 100%,如何排查问题? (MySQL面试第七弹)

【65期】Spring的IOC是啥?有什么好处?

【66期】Java容器面试题:谈谈你对 HashMap 的理解

【67期】谈谈ConcurrentHashMap是如何保证线程安全的?

【68期】面试官:对并发熟悉吗?说说Synchronized及实现原理

【69期】面试官:对并发熟悉吗?谈谈线程间的协作(wait/notify/sleep/yield/join)

【70期】面试官:对并发熟悉吗?谈谈对volatile的使用及其原理

我知道你 “在看”
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/696700
推荐阅读
相关标签
  

闽ICP备14008679号