赞
踩
为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。
而这个分布式协调技术的核心就是来实现分布式锁
。
分布式锁有很多,常用的有MySQL的、zookeeper的和Redis的
;
今天我就来分析分析Redis
分布式锁中的红锁
和联锁
;
重点分析的红锁,联锁只是顺便;
在讲解之前,先给出,获取Redis
分布式锁的本质:
Redis
获取分布式锁,其实就是向N
个Redis
实例中使用SETNX
来对该resource
设置键值。
解锁
:就是反向操作,删除掉resource
的键值;
一开始只是想研究红锁源码,但是发现红锁源码是基于联锁源码的,
所以联锁的情况一起分析了下;
一般来说:“分布式锁”是用来解决分布式应用中“并发冲突”
;
我们来看个例子:
上面是一个股票自选股的页面,window
电脑快捷键中:
insert键
可以添加一只股票到自选股中
delete键
可以在自选股中删除一只股票
现在有人快速的对某只股票狂按
这两个快捷键:比如一秒内按了三个来回;
后端接口判断逻辑很简单:
判断添加的股票是否已经在自选股中;
① 不在:添加;
② 在:不做任何操作;(实际情况会有业务操作,比如排序)
这个时候会很容易出现并发问题:自选股重复添加问题;
比如:
①添加操作请求过来时,还没有执行完;
②删除请求也来了,但是它执行完了;
③而后又有一个 添加请求来了;
④此时两个添加请求操作都判定自选股里没有这个股票,就会出现重复添加的问题;
这里只考虑依靠代码来解决并发问题,不考虑数据库唯一键的问题;
实际上,因为采用的是mongodb数据库,自选股的数据是存放在数组里,也没办法设置唯一键;
如果服务部署的是单例,只需要在代码关键地方加个锁就行了;
但是现在服务大多都是集群部署;有多个节点;这个时候就需要分布式锁
来解决问题;
这里先了解一个概念;
租约时间(leaseTime
): Redis
键key
的过期时间;
等待时间(waitTime
): 请求获取锁的等待时间;
红锁算法思想:
为了取到锁,客户端应该执行以下操作
:
Unix
时间,以毫秒为单位。key
和随机值获取锁。在步骤2,当向Redis
设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis
已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。3个节点
)的Redis
节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。key
的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3
计算的结果)。N/2+1
个Redis
实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis
实例上进行解锁(即便某些Redis
实例根本就没有加锁成功)。官方地址:http://redis.cn/topics/distlock.html
联锁的思想很简单:N个实例都必须获取到锁,有一个失败,即为失败。
Java11
Redisson: 3.1.3.6
为了方便分析代码,假设集群里三个Redis实例;
起始调用代码:
long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock(leaseTime, TimeUnit.SECONDS);
这里是以redLock.lock(leaseTime, TimeUnit.SECONDS)
方法为入口,开始分析的;
源码:
/**
* leaseTime租约时间,也就是键key的过期时间
*/
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
接着会调用lockInterruptibly()
方法:
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 以毫秒来算的话,1500==1.5s * 3 = 4.5s // 默认等待时间为4.5s long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { // 当没有设置租约时间时,waitTime 等于默认4.5s waitTime = baseWaitTime; } else { // 设置了租约时间的情况 // 将前端传入的租约时间转为毫秒 leaseTime = unit.toMillis(leaseTime); // 租约时间赋值给waitTime waitTime = leaseTime; if (waitTime <= 2000) { // 小于 2s情况 waitTime = 2000; } else if (waitTime <= baseWaitTime) { // ThreadLocalRandom.current() 多线程下生成随机数 // 假设传入3s,那么就会在(3/2=1s, 3s)之间产生随机数赋值给waitTime waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { // 假设传入6s,那么就会在(4.5s, 6s)之间产生随机数赋值给waitTime waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } // 假设leaseTime传入的是3s, 根据随机策略(1s, 3s),假设生成随机数是2s while (true) { // waitTime = 2s, leaseTime = 3s if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) { // 可以看出lock方法只能成功,才会生成退出。否则就得报异常退出 return; } } }
通过上面分析,可以知道lockInterruptibly()
方法主要做的事情:
① 生成一个小于leaseTime
的随机数赋值给waitTime
;
② 如果没有设置leaseTime
,那么就用默认值赋值给waitTime
;
③这个方法只有在外界不传入waitTime
参数时,才会被调用;
Q:这里不由得思考下,为什么这里采用了随机数?
该代码块在lock()方法被调用的情况下就会执行,而该方法是没有waitTime参数的,只有leaseTime参数;A: 最直观的 解释是因为tryLock需要这个参数;
leaseTime:这个字段是租约时间,也就是键key的过期时间。
为了保证获取到锁时,键key没过期,waitTime的值无论怎么生成都是小于leaseTime值的;
又因为前端没办法传入waitTime,通常策略就是随机生成指定范围的数;
接着我们分析:tryLock(waitTime, leaseTime, unit)
源码:
// waitTime = 2s, leaseTime = 3s, 时间单位毫秒 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; // 设置了租约时间的情况 // 这段逻辑 就是对有租约时间的情况下,设置newLeaseTime,实际获取分布式锁时用到 if (leaseTime != -1) { if (waitTime == -1) { newLeaseTime = unit.toMillis(leaseTime); } else { // 设置了等待时间的场景 // 根据我假设的情况,这里2 * 2 = 4s; // 这里乘以2,个人认为,从申请获取锁到真正获取到锁是有时间消耗的, // 为了防止获取到的锁不至于立马过期,所以乘以2,其实我觉得leaseTime*2也可以; // 因为从代码最后释放锁的逻辑来看,这里的租约时间多长,并不会影响最后锁的统一释放 newLeaseTime = unit.toMillis(waitTime)*2; } } // 当前时间 毫秒 long time = System.currentTimeMillis(); // 总的等待时间 long remainTime = -1; if (waitTime != -1) { // 因为waitTime=2s,所以remainTime=2s remainTime = unit.toMillis(waitTime); } // 这里红锁代码重写了calcLockWaitTime()方法 // 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s; // 计算每个锁的等待时间 在联锁的场景下,就等于remainTime long lockWaitTime = calcLockWaitTime(remainTime); // 允许获取锁失败的次数 在联锁的场景下,固定为0 int failedLocksLimit = failedLocksLimit(); List<RLock> acquiredLocks = new ArrayList<>(locks.size()); // 循环每个redis客户端,去获取锁 for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { // 红锁的情况中,根据计算规则肯定是取lockWaitTime // lockWaitTime = 1s, remainTime=2s 取最小值 1s long awaitTime = Math.min(lockWaitTime, remainTime); // awaitTime=1s, newLeaseTime=4s 开始尝试获取锁 lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { lockAcquired = false; } if (lockAcquired) { // 如果获取到了,就记录起来 acquiredLocks.add(lock); } else { // 失败的话,比较下是否到了失败次数 if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { // 3 - 2 也就是说 只有在成功两个,失败一个情况下,才会执行这里 // 换句话说,红锁的场景下走这里,联锁场景下一定不执行这里 break; } // 重试机制 if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1) { return false; } // 重置失败次数、锁列表、遍历游标,这说明要进行重试了 failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { // 红锁失败场景执行,因为联锁的场景中failedLocksLimit=0 failedLocksLimit--; } } // 超时控制代码块 if (remainTime != -1) { // System.currentTimeMillis() - time 单个Redis实例获取锁花费的时间 // remainTime = remainTime - System.currentTimeMillis() - time; remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { // 从这段逻辑可以看出 remainTime 就是总的等待时间,如果超过了,还没有走出循环,说明获取锁失败 // 对已经获取到的锁进行释放 unlockInner(acquiredLocks); return false; } } } // leaseTime = 3s // 下面逻辑是,key到了过期时间后,Redis利用异步线程进行删除,释放锁; if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); // 设置好过期时间 for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } // 同步可中断的方式来释放到期锁 for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } return true; }
代码具体干的事情,已有注释;
我们从中知道,实际获取锁的方法是lock.tryLock(...)
,在调用该方法之前,大体的流程图如下:
从流程图中我们知道:
① 目的就是为了计算出三个变量值:newLeaseTime
、awaitTime
和failedLocksLimit
;
前两个是用于实际获取锁的方法参数,最后一个是为了控制重试机制;
② 通过源码分析,联锁也是上面的流程,唯一的区别就是图中标红的部分;也就是在获取awaitTime
和failedLocksLimit
这两个参数的逻辑上;
这种区别Redis
源码是利用两个重载方法来实现的;
(这两个重载的方法:calcLockWaitTime(...)
和failedLocksLimit(...)
红锁情况下,重写的方法 calcLockWaitTime(...)
:
// 红锁重写
// 根据我的假设,此时remainTime=2s,locks.size()=3
@Override
protected long calcLockWaitTime(long remainTime) {
// Math.max(2 / 3, 1) = (0, 1) = 1
// 说明:按照下面算法,应该是每个实例等待时间和1进行比较,取最大值
return Math.max(remainTime / locks.size(), 1);
}
联锁 直接返回
remainTime
,也就是传入什么就返回什么;
红锁情况下,重写的方法 failedLocksLimit
:
// 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
联锁是直接返回
0
③ 重试机制,当在遍历Redis
的实例
获取锁失败的情况后,如果没有超时(即<waitTime
),那么Redis
会自动进行重试,再次来一遍,尝试获取锁。
④ 如果到了超时时间,那么是直接return false
结束掉;
⑤ 如果获取锁成功,并设置了leaseTime
那么Redis
会自动释放锁;
大体的流程如下:
// 假设waitTime=1s, leaseTime=3s,单位毫秒 private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { // 设置了租约时间的情况 if (leaseTime != -1) { // tryLockInnerAsync 真正加锁的方法,其会调用脚本执行redis命令插入key return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 没有设置租约时间的情况 // 程序(看门狗)会设置一个默认值为30s的租约时间 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired 获取锁成功 if (ttlRemaining == null) { // 对该设置时间轮询器,也可以理解为监听器 // 该监听器会以租约时间的三分之一的频率,不断延迟租约时间 // 目的是为了防止,业务程序还没有跑完,锁就被释放掉了。 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
scheduleExpirationRenewal(threadId)
的源码:
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 说明该线程已经获取到锁了,
// 内部变量值会加1,即:可重入锁
oldEntry.addThreadId(threadId);
} else {
// 该线程第一次获取到该锁
entry.addThreadId(threadId);
// 启动时间轮询器,开始每隔leaseTime/3的时间,不断去延长租约时间
renewExpiration();
}
}
renewExpiration()
的源码如下:
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // newTimeout 底层使用的是netty // Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { // 有异常,结束 log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself // 递归调用 renewExpiration(); } }); } // 定时器的时间为租约时间的三分之一 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
大体的流程如下:
从上面的代码分析我们知道:
① 为了防止业务程序还没有跑完,锁就被释放掉了,Redisson
底层利用netty
实现了一个时间轮询器, 触发的频率为租约时间
的三分之一,
来给没有设置租约时间
的场景进行自动延长租约时间;
② 红锁和联锁都是可重入锁;
@Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1), // 并通过pexpire设置失效时间(也是锁的租约时间) "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('set', KEYS[2] .. ':1', 1); " + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间 "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local key = KEYS[2] .. ':' .. ind;" + "redis.call('set', key, 1); " + "redis.call('pexpire', key, ARGV[1]); " + "local remainTime = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " + "return nil; " + "end;" + // 获取分布式锁的KEY的失效时间毫秒数 "return redis.call('pttl', KEYS[1]);", // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId)); }
获取锁的命令中,
KEYS[1]
就是Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId))
,表示分布式锁的key
;
这里个getName()
方法返回的是name
,这个name
是构造RLock lock = redissonClient.getLock(name);
传进去的。
ARGV[1]
就是internalLockLeaseTime
,即锁的租约时间,默认30s
;
ARGV[2]
就是getLockName(threadId)
,是获取锁时set
的唯一值;
释放锁:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 释放锁时需要在redis实例上执行的lua命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式锁KEY不存在,那么向channel发布一条消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
这里是参考:Redlock:Redis分布式锁最牛逼的实现
自此大部分的代码都分析完了;
下面自己又把其他几个类似的方法,一起人肉分析了一遍;
为了不重复贴代码,只贴关键的部分
起始调用代码:
long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock();
源码:
// 租约时间为 -1 相当于没有设置
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
只贴关键代码:
接着这里会调用lockInterruptibly()
方法,计算waitTime
后,来调用tryLock(long waitTime, long leaseTime, TimeUnit unit)
;
此时:
waitTime=4.5s leaseTime = -1
...
long remainTime = -1;
if (waitTime != -1) {
// remainTime = waitTime = 4.5s
remainTime = unit.toMillis(waitTime);
}
// 这里红锁代码重写了calcLockWaitTime()方法
// 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
// 计算每个锁的等待时间 (4.5/3 = 1, 1)取最大,=1
long lockWaitTime = calcLockWaitTime(remainTime);
// 允许获取锁的失败次数=1
int failedLocksLimit = failedLocksLimit();
...
实际获取锁代码块:
...
// min(1, 4.5) = 1
// 在redLock场景中lockWaitTime永远都会比remainTime值小
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=1, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
...
// 无参的情况下leaseTime=-1,也就说无参的情况下,必须手动释放锁;redis不会自动释放
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
...
小结:redLock.lock()
需要手动释放锁;
红锁的代码就是基于联锁实现的,只是下面两个重写的方法实现不同而已:
calcLockWaitTime()
failedLocksLimit()
起始调用代码:
long leaseTime = 3L;
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock(leaseTime, TimeUnit.SECONDS);
源码分析:
public void lock(long leaseTime, TimeUnit unit) { try { lockInterruptibly(leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // leaseTime=3s public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { // 默认等待时间 long baseWaitTime = locks.size() * 1500; long waitTime = -1; if (leaseTime == -1) { // waitTime = 3 * 1.5 = 4.5s waitTime = baseWaitTime; } else { leaseTime = unit.toMillis(leaseTime); // 3s waitTime = leaseTime; if (waitTime <= 2000) { waitTime = 2000; } else if (waitTime <= baseWaitTime) { // 3 <= 4.5 // 3/2=1, 3 假设随机数为2 waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime); } else { waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime); } } while (true) { // waitTime 2s,leaseTime=3s if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) { return; } } }
可以看出,上面这段代码和红锁是一模一样的逻辑:
此时:
waitTime
=2s ,leaseTime
=3s
只贴主要代码:
long remainTime = -1;
if (waitTime != -1) {
// 2s
remainTime = unit.toMillis(waitTime);
}
// 联锁的场景下,就是remainTime=2s
long lockWaitTime = calcLockWaitTime(remainTime);
// 联锁的场景下,固定为0
int failedLocksLimit = failedLocksLimit();
实际请求锁的代码块:
...
// 联锁的场景下,lockWaitTime和remainTime是相等的;
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 2s,newLeaseTime=4s
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
假设失败了:
if (failedLocksLimit == 0) { // 失败次数用完了 unlockInner(acquiredLocks); if (waitTime == -1) { return false; } // 重置失败次数、锁列表、遍历游标,这说明要进行重试了 failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } // 时间控制 可以看出上面的逻辑存在重试机制,所以才有下面的超时判断逻辑 if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } }
小结:联锁拥有重试机制,multiLock.lock(leaseTime, TimeUnit.SECONDS)
能自动释放锁;
起始调用代码:
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock();
在执行完随机生成waitTime逻辑之后;
此时:
waitTime=4.5s leaseTime=-1
贴主要代码:
long remainTime = -1;
if (waitTime != -1) {
// remainTime = waitTime=4.5s
remainTime = unit.toMillis(waitTime);
}
// lockWaitTime = remainTime = 4.5s
long lockWaitTime = calcLockWaitTime(remainTime);
// 固定为0
int failedLocksLimit = failedLocksLimit();
实际执行的代码块:
...
// lockWaitTime = remainTime = 4.5s
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=4.5s newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
剩下的情况和有参是一样的;
小结:联锁拥有重试机制,multiLock.lock()
需要手动释放锁;
起始调用代码:
RedissonRedLock redLock = new RedissonRedLock(lock);
redLock.tryLock();
public boolean tryLock() {
try {
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
可以看出,这里是没有调用随机生成waitTime
的逻辑,这里它全部都设置为了-1
。
此时:
waitTime=-1s leaseTime=-1
贴主要代码:
...
// 红锁 Max(-1/3, 1) 为1
long lockWaitTime = calcLockWaitTime(remainTime);
// 红锁 3 - 3/2+1 = 1
int failedLocksLimit = failedLocksLimit();
...
实际获取锁的代码块:
...
if (waitTime == -1 && leaseTime == -1) {
// 逻辑会走这里 尝试获取锁
lockAcquired = lock.tryLock();
}
...
我们自此可以知道redLock.tryLock();
方法:
① 是需要手动释放锁的
只贴主要代码:
tryLock(waitTime, -1, unit);
即:
waitTime
:传入值,假设为7s
leaseTime
:-1
...
long remainTime = -1;
if (waitTime != -1) {
// remainTime = 7s
remainTime = unit.toMillis(waitTime);
}
//max(7/3=2, 1)=2
long lockWaitTime = calcLockWaitTime(remainTime);
// 1
int failedLocksLimit = failedLocksLimit();
...
实际请求锁的执行代码块:
...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = 2s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
小节: 需要手动释放锁。
只贴主要代码:
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
即:
waitTime
:前端传入值;假设为7s;
leaseTime
: -1
// lockWaitTime = remainTime = waitTime =7s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
实际获取锁的代码块:
...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 7s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
小节:需要手动释放锁;
只贴出关键代码:
tryLock(-1, -1, null);
即:waitTime=-1,leaseTime=-1
受影响的变量:
// lockWaitTime = remainTime = waitTime =-1s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
实际请求锁代码块:
...
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
}
...
小节:需要手动释放锁
相同点
waitTime
在红锁和联锁的场景下是一定有值,无参的情况下,单个实例默认时间是1500
毫秒;lock()
有参的情况下,Redis
会自动释放锁,而无参情况下,需要手动释放锁;tryLock()
方法都需要手动释放锁;tryLock()
方法都不会执行重试
机制;waitTime
,并且没有超过允许的失败次数),那么就会进行重试;(除tryLock()
方法)leaseTime
(不等于-1)的情况下,当业务程序没有跑完时,都会延长租约时间
:默认值30s
;不同点
红锁和联锁之所以会有上面的区别,因为在
红锁
算法思想:快速尝试获取锁,要是获取不到,就去获取下一个;只要获取的锁的数量大于一半即可。
方法 | 是否需要手动释放锁 | 是否有重试机制 | 是否可重入 | 看门狗是否会延长租约时间 |
---|---|---|---|---|
redLock.lock() | 需要 | 有 | 是 | 会 |
redLock.lock(leaseTime, TimeUnit.SECONDS) | 不需要 | 有 | 是 | 不会 (leaseTime=-1 会) |
multiLock.lock() | 需要 | 有 | 是 | 会 |
multiLock.lock(leaseTime, TimeUnit.SECONDS) | 不需要 | 有 | 是 | 不会 (leaseTime=-1 会) |
redLock.tryLock() | 需要 | 无 | 是 | 会 |
redLock.tryLock(long waitTime, TimeUnit unit) | 需要 | 有 | 是 | 会 |
multiLock.tryLock(waitTime, TimeUnit.SECONDS) | 需要 | 有 | 是 | 会 |
multiLock.tryLock() | 需要 | 无 | 是 | 会 |
参考地址:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。