当前位置:   article > 正文

Redisson源码初探(七) MutiLock_multilock.trylock false

multilock.trylock false

MutiLock  说明东西?就是可以将多个锁合并成一个大锁,对一个大锁进行统一的申请和释放。其实就是一次性的去锁定多个资源,然后处理业务,最后统一释放

我们看源码之前先思考一下,这个如果基于之前的一些思想,如何去实现这个功能????难道是就是对多个所RedissonLock 依次去加锁?所有的锁加锁成功就代表MultiLock加锁成功?

我们还是先看看Redisson如何使用,也就是我们看源码的入口对把

  1. public static void main(String[] args) throws Exception {
  2. //构建一个配置信息对象
  3. Config config = new Config();
  4. config.useClusterServers()
  5. //定时扫描连接信息 默认1000ms
  6. .setScanInterval(2000)
  7. .addNodeAddress("redis://127.0.0.1:7001");
  8. //因为Redisson 是基于redis封装的一套便于复杂操作的框架
  9. //所以这里构建对象肯定是创建一些与redis的连接
  10. RedissonClient redisson = Redisson.create(config);
  11. RLock lock1 = redisson.getLock("lock1");
  12. RLock lock2 = redisson.getLock("lock2");
  13. RedissonMultiLock multiLock = new RedissonMultiLock(lock1,lock2);
  14. multiLock.lock();
  15. //释放锁
  16. multiLock.unlock();
  17. }

感觉就是去将多个RLock锁实例进行统一的操作管理?

我们看看实例化MultiLock传入的参数干嘛了,将其存入到了一个list 数组中,记住名字 locks

  1. final List<RLock> locks = new ArrayList<RLock>();
  2. public RedissonMultiLock(RLock... locks) {
  3. if (locks.length == 0) {
  4. throw new IllegalArgumentException("Lock objects are not defined");
  5. }
  6. this.locks.addAll(Arrays.asList(locks));
  7. }

进入到RedissonMultiLock 内部,现在基本上和之前差不多的代码

  1. @Override
  2. public void lock() {
  3. try {
  4. lockInterruptibly();
  5. } catch (InterruptedException e) {
  6. Thread.currentThread().interrupt();
  7. }
  8. }
  1. @Override
  2. public void lockInterruptibly() throws InterruptedException {
  3. lockInterruptibly(-1, null);
  4. }
  1. public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  2. /基础等待时间 这里我们按照3个lock 来统一计算 baseWaitTime = 4500MS
  3. long baseWaitTime = locks.size() * 1500;
  4. long waitTime = -1;
  5. //这里还是支持一些获取锁超时机制
  6. //计算一些等待时间
  7. if (leaseTime == -1) {
  8. waitTime = baseWaitTime;
  9. unit = TimeUnit.MILLISECONDS;
  10. } else {
  11. waitTime = unit.toMillis(leaseTime);
  12. if (waitTime <= 2000) {
  13. waitTime = 2000;
  14. } else if (waitTime <= baseWaitTime) {
  15. waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
  16. } else {
  17. waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
  18. }
  19. waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
  20. }
  21. while (true) {
  22. //尝试获取锁 死循环,不停的尝试获取所有需要的锁,
  23. //只要没有全部获取到
  24. //或者未超时那么就会一直死循环
  25. if (tryLock(waitTime, leaseTime, unit)) {
  26. return;
  27. }
  28. }
  29. }

其实我们大概看了一下,MultiLock 这个东西最主要的逻辑不是lua脚本来实现,其实是一个 high level 高层次的API,基于low level 低层次的API 进行了封装 

  1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  2. // try {
  3. // return tryLockAsync(waitTime, leaseTime, unit).get();
  4. // } catch (ExecutionException e) {
  5. // throw new IllegalStateException(e);
  6. // }
  7. long newLeaseTime = -1;
  8. if (leaseTime != -1) {
  9. newLeaseTime = unit.toMillis(waitTime)*2;
  10. }
  11. long time = System.currentTimeMillis();
  12. long remainTime = -1;
  13. if (waitTime != -1) {
  14. //设置remainTime 这个是个什么参数?猜一下
  15. remainTime = unit.toMillis(waitTime);
  16. }
  17. //上锁等待时间
  18. long lockWaitTime = calcLockWaitTime(remainTime);
  19. int failedLocksLimit = failedLocksLimit();
  20. //保存已经获取到的锁
  21. List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
  22. for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
  23. RLock lock = iterator.next();
  24. boolean lockAcquired;
  25. try {
  26. if (waitTime == -1 && leaseTime == -1) {
  27. lockAcquired = lock.tryLock();
  28. } else {
  29. long awaitTime = Math.min(lockWaitTime, remainTime);
  30. //这里底层获取锁
  31. //这里使用的还是RedissonLock.tryLock(awaitTime, newLeaseTime。。。)方法,这个方法别忘记 4500毫秒必须获取到锁,如果获取不到就标记为失败,然后会启动一个watchdog 不停的去锁对应key的 过期时间,除非你传入了一个leaseTime 时间,就不会启动一个watchdog
  32. lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
  33. }
  34. } catch (Exception e) {
  35. lockAcquired = false;
  36. }
  37. if (lockAcquired) {
  38. acquiredLocks.add(lock);
  39. } else {
  40. if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
  41. break;
  42. }
  43. if (failedLocksLimit == 0) {
  44. unlockInner(acquiredLocks);
  45. if (waitTime == -1 && leaseTime == -1) {
  46. return false;
  47. }
  48. failedLocksLimit = failedLocksLimit();
  49. acquiredLocks.clear();
  50. // reset iterator
  51. while (iterator.hasPrevious()) {
  52. iterator.previous();
  53. }
  54. } else {
  55. failedLocksLimit--;
  56. }
  57. }
  58. //计算剩余时间,什么时间?就是去获取大锁的允许的时间长
  59. //这块逻辑是意思?就是看你尝试获取多个锁的时间 是否超过了设置的时间
  60. //最多不能超过假设4500ms
  61. //超过了那么就会释放掉所有的锁,同时返回false,获取MultiLock 失败
  62. //然后返回false 之后,会返回重新进入该方法,再来尝试4500ms中获取到multiLock
  63. if (remainTime != -1) {
  64. remainTime -= (System.currentTimeMillis() - time);
  65. time = System.currentTimeMillis();
  66. if (remainTime <= 0) {
  67. unlockInner(acquiredLocks);
  68. return false;
  69. }
  70. }
  71. }
  72. if (leaseTime != -1) {
  73. List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
  74. for (RLock rLock : acquiredLocks) {
  75. RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
  76. futures.add(future);
  77. }
  78. for (RFuture<Boolean> rFuture : futures) {
  79. rFuture.syncUninterruptibly();
  80. }
  81. }
  82. return true;
  83. }

加锁其实到这已经结束了

那么如何释放锁?其实肯定也是调用底层原本RedissonLock 的API,最后肯定执行的是lua脚本对吧,这里我们就不详细讲了,看下源码打架都能理解

  1. @Override
  2. public void unlock() {
  3. List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
  4. //依次调用底层的RedissonLock方法异步释放锁
  5. for (RLock lock : locks) {
  6. futures.add(lock.unlockAsync());
  7. }
  8. //异步转同步
  9. for (RFuture<Void> future : futures) {
  10. future.syncUninterruptibly();
  11. }
  12. }
  1. @Override
  2. public RFuture<Void> unlockAsync() {
  3. long threadId = Thread.currentThread().getId();
  4. return unlockAsync(threadId);
  5. }
  1. @Override
  2. public RFuture<Void> unlockAsync(final long threadId) {
  3. final RPromise<Void> result = new RedissonPromise<Void>();
  4. RFuture<Boolean> future = unlockInnerAsync(threadId);
  5. future.addListener(new FutureListener<Boolean>() {
  6. @Override
  7. public void operationComplete(Future<Boolean> future) throws Exception {
  8. if (!future.isSuccess()) {
  9. cancelExpirationRenewal(threadId);
  10. result.tryFailure(future.cause());
  11. return;
  12. }
  13. Boolean opStatus = future.getNow();
  14. if (opStatus == null) {
  15. IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
  16. + id + " thread-id: " + threadId);
  17. result.tryFailure(cause);
  18. return;
  19. }
  20. if (opStatus) {
  21. cancelExpirationRenewal(null);
  22. }
  23. result.trySuccess(null);
  24. }
  25. });
  26. return result;
  27. }

 

  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  2. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  3. "if (redis.call('exists', KEYS[1]) == 0) then " +
  4. "redis.call('publish', KEYS[2], ARGV[1]); " +
  5. "return 1; " +
  6. "end;" +
  7. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  8. "return nil;" +
  9. "end; " +
  10. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  11. "if (counter > 0) then " +
  12. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  13. "return 0; " +
  14. "else " +
  15. "redis.call('del', KEYS[1]); " +
  16. "redis.call('publish', KEYS[2], ARGV[1]); " +
  17. "return 1; "+
  18. "end; " +
  19. "return nil;",
  20. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
  21. }

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/200961
推荐阅读
相关标签
  

闽ICP备14008679号