赞
踩
在多线程并发请求当中,为了保证我们的资源同一时刻只有一个线程进行操作(如商品超卖问题、购票系统等),我们通常要添加锁机制,如ReentrantLock,也就是可重入的互斥锁,与synchronized功能类似,因为比较灵活,所以经常使用。这在单机情况下是没有问题的,但在多节点的情况下,也就意味着有多个进程,ReentrantLock锁机制可能就会不起作用,所以我们需要一种能够跨进程的锁,也就是同一时刻只能让一个进程获取锁,来控制共享资源的访问。
核心也是使用了每个节点都会用到的第三方组件,例如mysql、redis、zookeeper
使用setnx命令,在 Redis 中,setnx 命令是可以帮助我们实现互斥,setnx 即 set if not exists (对应 Java 中的 setIfAbsent 方法),如果 key 不存在的话,会设置 key 的值,如果 key 已经存在, 则啥也不做。
if(redisTemplate.opsForValue().setIfAbsent(key, value , time, TimeUnit)){ //加锁
try {
do something //业务处理
}catch(){
}
finally {
// 释放锁
String delVal = valueOperations.get(key).toString();
if (value.equals(delVal)){
redisTemplate.delete(key);
}
}
}
通常情况下我们一般使用setnx
+ expire
来实现防止死锁,但仍然会有锁被别的线程误删的问题(查询,删除不是一个原子操作,会有并发问题)
为什么会有锁被别的线程误删?假如线程A和线程B都执行同一段代码进行加锁,线程A加锁成功,当出现业务执行时间过长,超过了过期时间,这时线程A释放了锁,此时线程B就能加锁成功,接下来执行线程B业务操作,这个时候线程A业务操作执行完了,在finally方法中执行delete key,这个时候线程A就会把线程B的锁给释放了。
所以一般释放的锁的时候,最好使用lua脚本来进行释放,来实现原子性的查询,比较,并删除锁。
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end;
lua脚本的话可以保证我们执行的时候多个命令执行期间不回被其他线程打断,或出现竞争状态,也就是可以看作一次请求,保证了我们命令的原子性。
但这个方案仍然有个缺点:锁过期释放了,业务还没执行完。对于可能存在锁过期释放,业务没执行完的问题。我们可以稍微把锁过期时间设置长一些,让其大于正常业务处理时间。如果你觉得不是很稳,还可以给获得锁的线程,开启一个定时守护线程,每隔一段时间检查锁是否还存在,存在则对锁的过期时间延长,防止锁过期提前释放(如每5秒查看一下锁的过期时间,如果小于10秒,就延期),针对这种锁续约机制,redission框架就帮我们解决了这个问题
首先redission使用方式也比较简单
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.0</version>
</dependency>
@Bean
public RedissonClient redissonClient(){
...
// 添加redis地址
// 设置锁的超时时间
// 创建 RedissonClient 对象
}
@Autowired
private RedissonClient redissonClient;
public void test() throws InterruptedException {
RLock lock = redissonClient.getLock("anyLock");
boolean locked = lock.tryLock(1,10,TimeUnit.SECONDS);// 参数:1.获取锁的最大等待时间(期间会重试),2.锁自动释放时间,3.时间单位
if(locked){
try{
// 业务操作
}finally{
//释放锁
lock.unlock();
}
}
}
假如我们加锁没有传参数直接使用tryLock(),Redisson则会设置默认的锁过期时间为30s,并且如果任务超过了30s还没有执行完毕,则后台会有一个线程,默认没隔10s执行task,重置过期时间,也就是WatchDog机制
redisson看门狗自动续期源码
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
tryAcquireAsync方法:
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); // 将线程放入缓存操作 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 如果已经有该线程,则不再延期 oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } } //--------------------------------------------------------------- private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { // 缓存不存在,则不再续约 return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 执行续约的lua脚本 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // 延期成功,回调自己,继续续约 renewExpiration(); } }); } // 每隔internalLockLeaseTime/3=10秒检查一次 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } // ----------------------------------------------------------------------- protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
关键方法:renewExpiration()
最后关于Redisson:
Redisson是Java的redis客户端之一,提供了一些api方便操作redis。锁只是它的一个工具类,其他还包括分布式对象、分布式集合等等,详细可参考:https://github.com/redisson/redisson/wiki/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。