当前位置:   article > 正文

分布式锁 — Redisson 全面解析

redisson分布式锁

前言

分布式锁主要是解决集群,分布式下数据一致性的问题。在单机的环境下,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatileReentrantLocksynchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。

分布式锁的实现主要有以下方式:

  • 基于数据库

  • 基于分布式协调系统

  • 基于缓存

    • 基于redis命令。如:setnx等操作

    • 基于redis Lua脚本能力(本文介绍的实现方式 redisson)

一、Redisson的使用

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例

引入maven依赖

  1. <!-- maven版本号根据项目版本自行调整 -->
  2. <!--redis-->
  3. <dependency>
  4.     <groupId>org.springframework.boot</groupId>
  5.     <artifactId>spring-boot-starter-data-redis</artifactId>
  6.     <version>2.4.0</version>
  7. </dependency>
  8. <!--使用redisson作为分布式锁-->
  9. <dependency>
  10.     <groupId>org.redisson</groupId>
  11.     <artifactId>redisson</artifactId>
  12.     <version>3.16.8</version>
  13. </dependency>

yml配置

  1. spring:
  2.   redis:
  3.     # Redis数据库索引(默认为0
  4.     database: 0
  5.     # Redis服务器地址
  6.     host: 127.0.0.1
  7.     # Redis服务器连接端口
  8.     port: 6379
  9.     # Redis服务器链接密码(默认为空)
  10.     password:
  11.     jedis:
  12.       pool:
  13.         # 连接池最大链接数(负值表示没有限制)
  14.         max-active: 20
  15.         # 连接池最大阻塞等待时间(负值表示没有限制)
  16.         max-wait: -1
  17.         # 链接池中最大空闲链接
  18.         max-idle: 10
  19.         # 连接池中最小空闲链接
  20.         min-idle: 0
  21.     # 链接超市时间(毫秒)
  22.     timeout: 1000

Redisson配置

  1. @Configuration
  2. public class RedissonConfig {
  3.     @Value("${spring.redis.host}")
  4.     private String redisHost;
  5.     @Value("${spring.redis.password}")
  6.     private String redisPassword;
  7.     @Value("${spring.redis.port}")
  8.     private String port;
  9.     @Bean
  10.     @ConditionalOnMissingBean
  11.     public RedissonClient redissonClient() {
  12.         Config config = new Config();
  13.         //单机模式  依次设置redis地址和密码
  14.         System.out.println(redisHost);
  15.         // config.useSingleServer().setAddress("redis://" + redisHost + ":" + port).setPassword(redisPassword);
  16.         // 没有配置redis密码
  17.         config.useSingleServer().setAddress("redis://" + redisHost + ":" + port);
  18.         return Redisson.create(config);
  19.     }
  20. }

测试用例

  1. package com.example.aopdemo;
  2. import com.example.aopdemo.springbootaopdemo.SpringBootDemoxzApplication;
  3. import java.util.concurrent.TimeUnit;
  4. import javax.annotation.Resource;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.junit.jupiter.api.Test;
  7. import org.redisson.api.RLock;
  8. import org.redisson.api.RedissonClient;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  11. /**
  12.  * @ClassName RedissonTest
  13.  * @Description Redisson测试用例
  14.  * @Author 阿Q
  15.  * @Date 2022/11/26
  16.  */
  17. @Slf4j
  18. @SpringBootTest(classes = SpringBootDemoxzApplication.class)
  19. public class RedissonTest {
  20.     @Resource
  21.     private RedissonClient redissonClient;
  22.     @Resource
  23.     private ThreadPoolTaskExecutor executor;
  24.     // redisson分布式锁的key
  25.     private static final String LOCK_TEST_KEY = "redisson:lock:test";
  26.     int n = 500;
  27.     /**
  28.      * 分布式锁测试用例
  29.      */
  30.     @Test
  31.     public void lockTest() {
  32.         // 利用 循环+多线程 模仿高并发请求
  33.         for (int i = 0; i < 10; i++) {
  34.             executor.execute(() -> {
  35.                 // 这里获取公平锁,遵循先进先出原则,方便测试
  36.                 RLock fairLock = redissonClient.getFairLock(LOCK_TEST_KEY);
  37.                 try {
  38.                     // 尝试加锁
  39.                     // waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
  40.                     // leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
  41.                     boolean lock = fairLock.tryLock(300030, TimeUnit.MILLISECONDS);
  42.                     if (lock){
  43.                         log.info("线程:" + Thread.currentThread().getName() + "获得了锁");
  44.                         log.info("剩余数量:{}", --n);
  45.                     }
  46.                 } catch (InterruptedException e) {
  47.                     e.printStackTrace();
  48.                 } finally {
  49.                     log.info("线程:" + Thread.currentThread().getName() + "准备释放锁");
  50.                     // 注意,无论出现任何情况,都要主动解锁
  51.                     fairLock.unlock();
  52.                 }
  53.             });
  54.         }
  55.         try {
  56.             // ->_-> 这里使当前方法占用的线程休息10秒,不要立即结束
  57.             Thread.sleep(10000);
  58.         } catch (InterruptedException e) {
  59.             e.printStackTrace();
  60.         }
  61.     }
  62. }

二、Redisson源码分析

redisson这个框架的实现依赖了Lua脚本和Netty,以及各种Future及FutureListener的异步、同步操作转换,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能

简单了解Lua脚本(Redisson 版本 3.16.8)

想要真正读懂redisson底层的加锁解锁实现,基本的lua脚本还是要了解一下的,这里作者就不深入了(毕竟我也是门外汉),大家有兴趣的可以去深入一下,这里只针对锁的实现来解释一下

加锁脚本
  • KEYS[1] 锁的名字

  • ARGV[1] 锁自动失效时间(毫秒,默认30s(看门狗续期时长))

  • ARGV[2] hash子项的key(uuid+threadId)

  1. --如果锁不存在
  2. if (redis.call('exists', KEYS[1]) == 0) then
  3. --重入次数初始为0后加一
  4. redis.call('hincrby', KEYS[1], ARGV[2], 1);
  5. --设锁的过期时间
  6. redis.call('pexpire', KEYS[1], ARGV[1]);
  7. --返回null-代表加锁成功
  8. return nil;
  9. --结束符
  10. end;
  11. --如果加锁的进程已存在
  12. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  13. --重入次数加一
  14. redis.call('hincrby', KEYS[1], ARGV[2], 1);
  15. --更新锁的过期时间(毫秒)
  16. redis.call('pexpire', KEYS[1], ARGV[1]);
  17. --返回null-代表重入成功
  18. return nil;
  19. --结束符
  20. end;
  21. --返回锁的剩余时间(毫秒)-代表加锁失败
  22. return redis.call('pttl', KEYS[1]);
741a5b9fa82dcd609d77225f4c747c4d.png

结论:当且仅当返回nil,才表示加锁成功;

解锁脚本

  • KEYS[1] 锁的名字

  • KEYS[2] 发布订阅的channel=redisson_lock__channel:{lock_name}

  • ARGV[1] 发布订阅中解锁消息=0

  • ARGV[2] 看门狗续期时间

  • ARGV[3] hash子项的key=uuid+threadId

  1. --如果锁不存在
  2. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  3. --返回null-代表解锁成功
  4. return nil;
  5. end;
  6. --重入次数减一
  7. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  8. --如果重入次数不为0,对锁进行续期(使用看门狗的续期时间,默认续期30s)
  9. if (counter > 0) then
  10. redis.call('pexpire', KEYS[1], ARGV[2]);
  11. --返回0-代表锁的重入次数减一,解锁成功
  12. return 0;
  13. --否则重入次数<=0
  14. else
  15. --删除key
  16. redis.call('del', KEYS[1]);
  17. --向channel中发布删除key的消息
  18. redis.call('publish', KEYS[2], ARGV[1]);
  19. --返回1-代表锁被删除,解锁成功
  20. return 1;
  21. end;
  22. return nil;
a0e4a7f356c27bd732186f41598cc1c8.png

结论:当且仅当返回1,才表示当前请求真正解锁;

看门口续期lua脚本

io.netty.util.TimerTask每10秒执行一次(30(续期时间)/3)

  • KEYS[1] 锁的名字

  • ARGV[1]

  1. --自己加的锁存在
  2. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  3. --续期
  4. redis.call('pexpire', KEYS[1], ARGV[1]);
  5. --1代表续期成功
  6. return 1;
  7. end;
  8. --自己加的锁不存在,后续不需要再续期
  9. return 0;

源码鉴赏

加锁逻辑

  1. // tryLock 是Redisson加锁的核心代码,在这里,我们基本可以了解加锁的整个逻辑流程
  2. @Override
  3. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  4.     // 获取锁能容忍的最大等待时长
  5.     long time = unit.toMillis(waitTime);
  6.     // 获取当前系统时间 - 节点一
  7.     long current = System.currentTimeMillis();
  8.     // 获取当前线程id
  9.     long threadId = Thread.currentThread().getId();
  10.     
  11.     // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
  12.     Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  13.     if (ttl == null) {
  14.         return true;
  15.     }
  16.     
  17.    // 剩余等待时长 =   最大等待时长-(当前时间-节点一)
  18.     time -= System.currentTimeMillis() - current;
  19.     if (time <= 0) {
  20.         acquireFailed(waitTime, unit, threadId);
  21.         return false;
  22.     }
  23.     
  24.     current = System.currentTimeMillis();
  25.     
  26.     // 【核心点2】订阅解锁消息
  27.     CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  28.     try {
  29.         // 以阻塞的方式获取订阅结果,最大等待时常time
  30.         subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
  31.     } catch (ExecutionException | TimeoutException e) {
  32.         // 判断异步任务是否不存在
  33.         if (!subscribeFuture.cancel(false)) {
  34.             subscribeFuture.whenComplete((res, ex) -> {
  35.                 // 异步任务出现异常,取消订阅
  36.                 if (ex == null) {
  37.                     unsubscribe(res, threadId);
  38.                 }
  39.             });
  40.         }
  41.         acquireFailed(waitTime, unit, threadId);
  42.         return false;
  43.     }
  44.     try {
  45.         // 剩余等待时长
  46.         time -= System.currentTimeMillis() - current;
  47.         if (time <= 0) {
  48.             acquireFailed(waitTime, unit, threadId);
  49.             return false;
  50.         }
  51.     
  52.     // 循环获取锁      
  53.         while (true) {
  54.             long currentTime = System.currentTimeMillis();
  55.             // 再次获取锁,成功则返回
  56.             ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
  57.             // lock acquired
  58.             if (ttl == null) {
  59.                 return true;
  60.             }
  61.             time -= System.currentTimeMillis() - currentTime;
  62.             if (time <= 0) {
  63.                 acquireFailed(waitTime, unit, threadId);
  64.                 return false;
  65.             }
  66.             currentTime = System.currentTimeMillis();
  67.             
  68.             // 【核心点3】阻塞等待信号量唤醒或者超时,接收到订阅时唤醒
  69.          // 使用的是Semaphore#tryAcquire()
  70.            // 判断 锁的占有时间(ttl)是否小于等待时间  
  71.             if (ttl >= 0 && ttl < time) {
  72.                 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  73.             } else {
  74.                 commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  75.             }
  76.             time -= System.currentTimeMillis() - currentTime;
  77.             if (time <= 0) {
  78.                 acquireFailed(waitTime, unit, threadId);
  79.                 return false;
  80.             }
  81.         }
  82.     } finally {
  83.         // 因为是同步操作,所以无论加锁成功或失败,都取消订阅
  84.         unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
  85.     }
  86. }

接下来,我们再一起看一下 tryAcquire()

  1. private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2.     // get方法就是以线程阻塞的方式获取结果,这里不再展示,有兴趣的朋友可以自行查看源码
  3.     return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  4. }
  1. /**
  2.  * 异步的方式尝试获取锁
  3.  */
  4. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  5.         RFuture<Long> ttlRemainingFuture;
  6.      
  7.         // 占有时间等于 -1 表示会一直持有锁,直到业务进行完成,主动解锁(这里就显示出了finally的重要性)
  8.         if (leaseTime != -1) {
  9.             // 【核心点4】这里就是直接使用lua脚本
  10.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  11.         } else {
  12.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  13.                     TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  14.         }
  15.         CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  16.             // lock acquired
  17.             if (ttlRemaining == null) {
  18.                 if (leaseTime != -1) {
  19.                     internalLockLeaseTime = unit.toMillis(leaseTime);
  20.                 } else {
  21.                     scheduleExpirationRenewal(threadId);
  22.                 }
  23.             }
  24.             return ttlRemaining;
  25.         });
  26.         return new CompletableFutureWrapper<>(f);
  27.     }
  1. /**
  2.  * 兄弟们看到这里应该就恍然大悟了吧,redisson最底层就是lua脚本的直接调用
  3.  * 这里就不作说明了,上面的lua脚本分析已经说的很明白了
  4.  */
  5. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  6.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  7.                 "if (redis.call('exists', KEYS[1]) == 0) then " +
  8.                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  9.                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  10.                         "return nil; " +
  11.                         "end; " +
  12.                         "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  13.                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  14.                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  15.                         "return nil; " +
  16.                         "end; " +
  17.                         "return redis.call('pttl', KEYS[1]);",
  18.                 Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
  19.     }

解锁逻辑

  1. /**
  2. * 解锁逻辑
  3. */
  4. @Override
  5. public void unlock() {
  6.     try {
  7.         // 相信看来上面的逻辑,兄弟们应该已经很熟悉redisson作者的编码习惯了,照例get还是以线程阻塞的方式获取结果
  8.         get(unlockAsync(Thread.currentThread().getId()));
  9.     } catch (RedisException e) {
  10.         if (e.getCause() instanceof IllegalMonitorStateException) {
  11.             throw (IllegalMonitorStateException) e.getCause();
  12.         } else {
  13.             throw e;
  14.         }
  15.     }
  16. }
  1. // 异步解锁
  2. @Override
  3. public RFuture<Void> unlockAsync(long threadId) {
  4.     // 调用异步解锁方法
  5.  // unlockInnerAsync() 是抽象方法,所有的实现方法都是直接调用lua脚本,这里不做展示,感兴趣的同学可以自行查看
  6.  // 调用真正的实现取决于上面我们创建什么样的lock对象        
  7.     RFuture<Boolean> future = unlockInnerAsync(threadId);
  8.     CompletionStage<Void> f = future.handle((opStatus, e) -> {
  9.         cancelExpirationRenewal(threadId);
  10.         if (e != null) {
  11.             throw new CompletionException(e);
  12.         }
  13.         if (opStatus == null) {
  14.             IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
  15.                     + id + " thread-id: " + threadId);
  16.             throw new CompletionException(cause);
  17.         }
  18.         return null;
  19.     });
  20.     return new CompletableFutureWrapper<>(f);
  21. }

来源:blog.csdn.net/weixin_44030143/

article/details/130825037

后端专属技术群

构建高质量的技术交流社群,欢迎从事编程开发、技术招聘HR进群,也欢迎大家分享自己公司的内推信息,相互帮助,一起进步!

文明发言,以交流技术职位内推行业探讨为主

广告人士勿入,切勿轻信私聊,防止被骗

dd4d88991b72e5c20369fa91d2d3935c.png

加我好友,拉你进群
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/710714
推荐阅读
  

闽ICP备14008679号