赞
踩
基于Redis的分布式MultiLock对象允许对Lock对象进行分组并将它们作为单个锁进行处理。每个RLock对象可能属于不同的Redisson实例。
如果获取的Redisson实例MultiLock崩溃,那么它可能永远挂在获取状态。为了避免这种情况,Redisson维护了一个锁看门狗,它会在持有者Redisson实例处于活动状态时延长锁过期时间。默认情况下,锁定看门狗超时为30s,可以通过Config.lockWatchdogTimeout设置进行更改。作者的另外一篇文章有对看门狗机制有解析:基于Redisson的可重入分布式锁
leaseTime:在指定的时间间隔后锁将自动释放
MultiLock对象的行为符合java锁规范。这意味着只有锁的拥有者线程才能解锁它,否则会抛出IllegalMonitorStateException异常。否则考虑使用RSemaphore对象。
普通使用示例:
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); // traditional lock method multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { multiLock.unlock(); } }
Async接口使用的代码示例:
RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RLock multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); RFuture<Void> lockFuture = multiLock.lockAsync(); // or acquire lock and automatically unlock it after 10 seconds RFuture<Void> lockFuture = multiLock.lockAsync(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds RFuture<Boolean> lockFuture = multiLock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((res, exception) -> { // ... multiLock.unlockAsync(); });
Reactive接口使用的代码示例:
RedissonReactiveClient anyRedisson = redissonClient.reactive(); RLockReactive lock1 = redisson1.getLock("lock1"); RLockReactive lock2 = redisson2.getLock("lock2"); RLockReactive lock3 = redisson3.getLock("lock3"); RLockReactive multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); Mono<Void> lockMono = multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds Mono<Void> lockMono = multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Mono<Boolean> lockMono = multiLock.tryLock(100, 10, TimeUnit.SECONDS); lockMono.doOnNext(res -> { // ... }) .doFinally(multiLock.unlock()) .subscribe();
RxJava3接口使用的代码示例:
RedissonRxClient anyRedisson = redissonClient.rxJava(); RLockRx lock1 = redisson1.getLock("lock1"); RLockRx lock2 = redisson2.getLock("lock2"); RLockRx lock3 = redisson3.getLock("lock3"); RLockRx multiLock = anyRedisson.getMultiLock(lock1, lock2, lock3); Completable lockRes = multiLock.lock(); // or acquire lock and automatically unlock it after 10 seconds Completable lockRes = multiLock.lock(10, TimeUnit.SECONDS); // or wait for lock aquisition up to 100 seconds // and automatically unlock it after 10 seconds Single<Boolean> lockRes = multiLock.tryLock(100, 10, TimeUnit.SECONDS); lockRes.doOnSuccess(res -> { // ... }) .doFinally(multiLock.unlock()) .subscribe();
// 这里相对简单,就是创建了一个RLock集合,为了后续分别去获取锁
final List<RLock> locks = new ArrayList<>();
@Override
public RLock getMultiLock(RLock... locks) {
return new RedissonMultiLock(locks);
}
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
leaseTime:指定加锁的时间。超过这个时间后锁便自动解开了。
为了方便我们的源码分析,假设我们的locks的size为6。leaseTime为2s
@Override public void lock(long leaseTime, TimeUnit unit) { try { lockInterruptibly(leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } @Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 基础等待时间设置为连锁数量*1500,单位是毫秒 // 6*1500=9000ms 也就是9s long baseWaitTime = locks.size() * 1500; // 设置等待时间为-1 long waitTime = -1; // 如果锁释放的时间为-1,就让等待时间等于基础等待时间9s // lock的无参方法默认leaseTime=-1 if (leaseTime == -1) { waitTime = baseWaitTime; } else { // 如果锁的释放时间不为-1,把leaseTime转为毫秒 leaseTime = unit.toMillis(leaseTime); // 把锁的释放时间传给等待时间,如果leaseTime=2s那么waitTime也等于2s waitTime = leaseTime; if (waitTime <= 2000) { // 也就是说leaseTime即使小于2s,waitTime也会被重置为2s waitTime = 2000; } else if (waitTime <= baseWaitTime) { // 如果leaseTime大于2s,并且小于9s,将重新设置等待时间,我们暂且还不知道这个等待时间做什么用。 // 如果leaseTime等于6,那么waitTime=6,此时waitTime小于9s,重新设置waitTime // 将waitTime设置为大于等于3小于6的整数。(此处不明白看下面的解释) waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { // 如果leaseTime大于2s而且大于9s(baseWaitTime),同样重新设置waitTime的值 // 如果传入的leaseTime=10s,那么waitTime一开始也是10s,并且大于baseWaitTime的9s // 将waitTime设置为大于等于9s,小于10s的整数。 waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } while (true) { // 传入waitTime开始尝试获取锁了 if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) { return; } } }
ThreadLocalRandom.current().nextLong(origin, bound)是用于生成一个指定范围内的随机长整数。
具体解释如下:
ThreadLocalRandom.current() 返回当前线程的 ThreadLocalRandom 实例,用于生成随机数。
nextLong(origin, bound) 生成一个介于 origin(包含)和 bound(不包含)之间的随机长整型数。这意味着生成的随机数大于等于 origin,并且小于 bound。
此处为什么需要去修改waitTime的值,为什么还得整个随机数,使用baseWaitTime调整waitTime的作用是什么?
waitTime:表示尝试获取锁的等待时间。它指定了在尝试获取锁时最长的等待时间。
leaseTime: 指定加锁的时间。超过这个时间后锁便自动解开了。
TimeUnit:时间单位
// 假设传入的waitTime=2s leaseTime=2s public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 定义了一个新的释放时间newLeaseTime=-1 long newLeaseTime = -1; // 如果传入了时间的tryLock,leaseTime就不等于-1,不传默认值为-1 if (leaseTime != -1) { // 将新的锁释放时间设置为waitTime的2倍,单位是毫秒,也就是4000ms newLeaseTime = unit.toMillis(waitTime)*2; } // 获取当前时间(毫秒) long time = System.currentTimeMillis(); // remain==保持,先翻译为保持时间,定义为-1 long remainTime = -1; if (waitTime != -1) { // 保持时间设置为waitTime,2000ms remainTime = unit.toMillis(waitTime); } // calcLockWaitTime(remainTime);-->return Math.max(remainTime / locks.size(), 1); // 300ms=lockWaitTime long lockWaitTime = calcLockWaitTime(remainTime); // return 0 int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 循环获取锁 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { // 获取到的redisson实例生成的锁 RLock lock = iterator.next(); // 锁获取标识 boolean lockAcquired; try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { // awaitTime=300ms long awaitTime = Math.min(lockWaitTime, remainTime); // 直接去获取锁,返回true or false lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { // 如果发生了RedisResponseTimeoutException,会先解锁。因为这个时候不确定是否加锁成功了,所以解锁设置标识为失败。 unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { // 其他异常设置标识为false lockAcquired = false; } if (lockAcquired) { // 如果加锁成功 放入集合中 acquiredLocks.add(lock); } else { // 6-当前成功的数量=0,直接退出循环,也就是说超过了最大的失败限制 // 这里RedissonRedLock有重写,红锁有自己的规则 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } // failedLocksLimit==0,那么只要失败就进入这个逻辑 if (failedLocksLimit == 0) { // 会把获取到锁的一次性解锁 unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } // 重置failedLocksLimit=0 failedLocksLimit = failedLocksLimit(); // 清空获取到锁的集合 acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { // RedissonRedLock才会进入这个逻辑 failedLocksLimit--; } } // 如果remainTime不为-1 // remainTime=2000ms if (remainTime != -1) { // 查看remainTime的剩余时间 remainTime -= System.currentTimeMillis() - time; // 重置time time = System.currentTimeMillis(); // 如果保持时间也就是之前的waitTime小于0,也就是说超过了尝试获取锁时最长的等待时间,释放所有已获得的锁,并返回false,加锁失败 if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } // 如果没有超过尝试获取锁时最长等待时间,并且leaseTime不为-1 if (leaseTime != -1) { // 创建了一个RFuture集合 List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { //为每个锁设置过期时间,是一个异步的操作 RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { // 阻塞当前线程,同步等待每个异步操作的结果 rFuture.syncUninterruptibly(); } } return true; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。