赞
踩
在分布式系统中,多个进程或节点可能同时尝试访问共享资源或数据。
为了确保数据的一致性和避免竞态条件,需要一种机制来协调对这些资源的访问。
常见的分布式实现有:基于数据库,redis,zookeeper
本文着重介绍基于redis的 Redisson 客户端作为一种分布式锁的解决方案
版本:Redission3.17.6
Redisson 是一个基于 Java 的 Redis 客户端,它提供了一些高级功能,使得在 Java 中使用 Redis 变得更加简单和方便。
Redisson 的基本概念和功能包括:
分布式锁:Redisson 提供了一种简单而强大的方式来实现分布式锁。
它支持多种锁模式,如公平锁、可重入锁、读写锁等,并且提供了锁的超时设置和自动释放功能。
缓存:Redisson 提供了一个强大的缓存实现,可以将数据存储在 Redis 中,并提供了数据过期、缓存刷新、缓存同步等功能。
队列和列表:Redisson 提供了对 Redis 的列表和队列的高级操作,如阻塞队列、延迟队列、循环队列等。
发布/订阅:Redisson 提供了发布/订阅功能,允许你在分布式系统中实现消息发布和订阅。
映射:Redisson 提供了对 Redis 中的映射(Hash)的高级操作,如存储对象、获取对象、设置对象的过期时间等。
在Redisson中常见获取锁的方式有
lock() 方法 与 tryLock() 方法
我们先来阐述两者的区别,再分析它们的源码。
(1)返回值: lock() 是没有返回值的;tryLock() 的返回值是 boolean。
(2)时机:lock() 一直等锁释放;tryLock() 获取到锁返回true,获取不到锁并直接返回false。
(3)tryLock() 是可以被打断的,被中断的;lock是不可以。
lock方法加锁代码
一般如下这样
// 使用 lock() 方法获取锁
RLock lock = client.getLock("myLock");
lock.lock();
try {
System.out.println("获取锁成功,执行被保护的代码...");
} finally {
lock.unlock();
}
tryLock方法加锁代码
一般如下这样
// 使用 tryLock() 方法获取锁
lock = client.getLock("myLock");
if (lock.tryLock()) {
try {
System.out.println("获取锁成功,执行被保护的代码...");
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁失败,无法执行被保护的代码...");
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // 获取当前线程ID long threadId = Thread.currentThread().getId(); // 调用 tryAcquire 方法尝试获取锁,等待时间-1、 // 指定的租约时间、时间单位以及线程ID。如果成功获取到锁(即返回值为null),则直接返回。 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 如果没拿到锁则订阅当前线程ID相关的锁信息,并设置超时回调 CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; // 判断是否是可中断的 if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { // 死循环尝试获取锁 ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { // entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }
//waitTime: 表示在获取锁失败时,线程最多等待的时间。 //leaseTime: 表示成功获取锁后的租约时间,即锁的有效期。 //unit: leaseTime参数的时间单位,如秒(SECONDS)、毫秒(MILLISECONDS)等。 //threadId: 当前线程的唯一标识符 private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Boolean> acquiredFuture; // 判断有效期是否大于0 if (leaseTime > 0) { // 以方法设置的为准 acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } else { // 使用内部预设的internalLockLeaseTime作为锁的租期去尝试获取锁。 // 此处获取的是Config#lockWatchdogTimeout的值30*1000也就是默认30秒 acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> { // lock acquired if (acquired) { // 判断租期是否大于0 if (leaseTime > 0) { //将外部传的锁租期时间赋给内部锁的租期时间 internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 说明这个锁没有租期,则开启一个线程自动为这个锁续期,确保在过期前得以自动续组 scheduleExpirationRenewal(threadId); } return acquired; }); return new CompletableFutureWrapper<>(f); }
接下来着重介绍一下
RedissionLcock#tryAcquireAsync()方法
//方法接受四个参数:waitTime(等待时间)、leaseTime(锁的租期)、unit(时间单位)和threadId(线程ID)。 private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // 判断锁是否有租期,并尝试调用tryLockInnerAsync获取锁得到一个表示锁剩余时间(ttlRemaining)的RFuture对象。 // 然后通过调用thenApply方法对这个异步结果进行进一步处理: if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 使用内部预设的internalLockLeaseTime作为锁的租期去尝试获取锁。 // 此处获取的是Config#lockWatchdogTimeout的值30*1000也就是默认30秒 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // 说明锁拿到了 if (ttlRemaining == null) { // 判断租期是否大于0 if (leaseTime > 0) { //将外部传的锁租期时间赋给内部锁的租期时间 internalLockLeaseTime = unit.toMillis(leaseTime); } else { // 说明这个锁没有租期,则开启一个线程自动为这个锁续期,确保在过期前得以自动续组 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
主要加锁逻辑在RedissionLcock#tryLockInnerAsync()方法
//该方法接受锁等待时间、锁租用时间、时间单位、线程ID和Redis命令作为参数 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); // 这段lua脚本接收三个参数 // KEYS[1]:锁的名称 // ARGV[1]:锁的过期时间 // ARGV[2]:加锁的唯一标识,由Redission客户端ID+当前线程id组成。 }
如果加锁成功就会返回null,否则返回有效期。
这是redission实现分布式锁的核心方法,它采用了一段lua脚本
lua脚本在redis中执行是原子性的, 也就是说当两个lua脚本并发调用的时候,只会同时执行一个。
lua脚本的好处和redis提供的pipeline相比, 不仅是都能一次能执行很多命令, 而且相比于pipeline能够保证原子性。
可能很多不懂redis命令的同学一开始没懂这段脚本做了啥,下面是简化版的java版本
if(判断是否不存在key名称为xxxx的key){ //0说明不存在 1说明存在 // hincrby命令用于对hash类型数据中的某个字段的值进行增加操作 //类似于hashmap的put hashmap.put("xxxx","1"); //设置key有效期。并返回null return null; } if(判断hash结构中的key中是否存在某个值){ //存在 // hincrby命令用于对hash类型数据中的某个字段的值进行增加操作 //对当前key对应的值进行累加。 hashmap.put("xxxx",已存储的值+1); //设置key有效期。并返回null return null; } //如果前两个都不符合,说明redis已有该key ,则返回该key的有效期 return 返回xxxx的有效期
从RedissionLcock#tryAcquireAsync()方法的分支判断
// 如果有效期大于0 则会使用方法调用时设置的有效期
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 如果小于0
// 使用内部预设的internalLockLeaseTime作为锁的租期去尝试获取锁。
// 此处获取的是Config#lockWatchdogTimeout的值30*1000也就是默认30秒
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
而由于小于0 redisson会单独开启一个线程为当前线程的锁去定期续订这个有效时间
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
watch dog 在当前线程存活时每( lockWatchdogTimeout/3)10s给分布式锁的key续期 30s;
watch dog 机制启动时如果代码中没有释放锁操作时,watch dog 会不断的给锁续期;
如果程序释放锁操作时因为异常没有被执行,那么锁无法被释放,所以释放锁操作一定要放到 finally {} 中;
要使 watchLog机制生效 那么锁的有效期就不要指定。
所谓可重入指的是同一个线程可以多次获取同一个锁。
每次获取锁时,持有计数会增加,而释放锁时,持有计数会减少。
这样,即使同一个线程多次获取锁,只要它在每次释放锁时正确地减少持有计数,就可以保证锁的安全性和正确性。
RedissonLock利用了Redis的hash数据结构来实现持有计数的管理,确保了锁的可重入性。
参考链接
一文讲透分布式锁安全性问题
Redis分布式锁实现Redisson 15问
Redission实现分布式锁
Redisson之lock()和tryLock()的区别及说明
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。