赞
踩
当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。
在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。
在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。
所以在锁的设计时,需要考虑两点。
1、锁的颗粒度要尽量小
。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。
2、锁的范围尽量要小
。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。
我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。
针对以上 Redisson都能很好的满足,下面就来分析下它。
为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。
线程去获取锁,获取成功: 执行 lua脚本,保存数据到 redis数据库。
线程去获取锁,获取失败: 一直通过 while循环尝试获取锁,获取成功后,执行 lua脚本,保存数据到 redis数据库。
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
但在实际开发中会有下面一种情况:
1 //设置锁1秒过去
2 redissonLock.lock("redisson", 1);
3 /**
4 * 业务逻辑需要咨询2秒
5 */
6 redissonLock.release("redisson");
7
8 /**
9 * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
10 * 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
11 * 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
12 */
所以这个时候看门狗
就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个 watch dog后台线程,不断的延长锁 key的生存时间。注意:正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。
这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在 lua脚本中发送给redis,而且 redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:
1、Redis存储锁的数据类型是 Hash类型
2、Hash数据类型的key值包含了当前线程信息。
下面是redis存储的数据
这里表面数据类型是 Hash类型,Hash类型相当于我们 java的 <key,<key1,value>>
类型,这里key是指 ‘redisson’
它的有效期还有9秒,我们再来看里们的 key值为 078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是:
guid + 当前线程的ID。后面的 value是就和可重入加锁有关。
举图说明
上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
Redis分布式锁会有个缺陷,就是在 Redis哨兵模式下:
客户端1
对某个master节点
写入了 redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。
这时客户端2
来尝试加锁的时候,在新的 master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
缺陷
在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。
public interface RLock extends Lock, RExpirable, RLockAsync
很明显 RLock是继承 Lock锁,所以他有 Lock锁的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。
这里针对上面做个整理,这里列举几个常用的接口说明
1 public interface RRLock { 2 //----------------------Lock接口方法----------------------- 3 4 /** 5 * 加锁 锁的有效期默认30秒 6 */ 7 void lock(); 8 /** 9 * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false . 10 */ 11 boolean tryLock(); 12 /** 13 * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间, 14 * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。 15 * 16 * @param time 等待时间 17 * @param unit 时间单位 小时、分、秒、毫秒等 18 */ 19 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 20 /** 21 * 解锁 22 */ 23 void unlock(); 24 /** 25 * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过 26 * Thread.currentThread().interrupt(); 方法真正中断该线程 27 */ 28 void lockInterruptibly(); 29 30 //----------------------RLock接口方法----------------------- 31 /** 32 * 加锁 上面是默认30秒这里可以手动设置锁的有效时间 33 * 34 * @param leaseTime 锁有效时间 35 * @param unit 时间单位 小时、分、秒、毫秒等 36 */ 37 void lock(long leaseTime, TimeUnit unit); 38 /** 39 * 这里比上面多一个参数,多添加一个锁的有效时间 40 * 41 * @param waitTime 等待时间 42 * @param leaseTime 锁有效时间 43 * @param unit 时间单位 小时、分、秒、毫秒等 44 */ 45 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 46 /** 47 * 检验该锁是否被线程使用,如果被使用返回True 48 */ 49 boolean isLocked(); 50 /** 51 * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有) 52 * 这个比上面那个实用 53 */ 54 boolean isHeldByCurrentThread(); 55 /** 56 * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间 57 * @param leaseTime 锁有效时间 58 * @param unit 时间单位 小时、分、秒、毫秒等 59 */ 60 void lockInterruptibly(long leaseTime, TimeUnit unit); 61 }
RLock相关接口,主要是新添加了 leaseTime
属性字段,主要是用来设置锁的过期时间,避免死锁。
public class RedissonLock extends RedissonExpirable implements RLock
RedissonLock实现了 RLock接口,所以实现了接口的具体方法。这里我列举几个方法说明下
1 @Override
2 public void lock() {
3 try {
4 lockInterruptibly();
5 } catch (InterruptedException e) {
6 Thread.currentThread().interrupt();
7 }
8 }
发现 lock锁里面进去其实用的是lockInterruptibly
(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。
具体有关 lockInterruptibly()方法讲解推荐一个博客。博客
:Lock的lockInterruptibly()
接下来执行流程,这里理下关键几步
1 /** 2 * 1、带上默认值调另一个中断锁方法 3 */ 4 @Override 5 public void lockInterruptibly() throws InterruptedException { 6 lockInterruptibly(-1, null); 7 } 8 /** 9 * 2、另一个中断锁的方法 10 */ 11 void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException 12 /** 13 * 3、这里已经设置了锁的有效时间默认为30秒 (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30) 14 */ 15 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 16 /** 17 * 4、最后通过lua脚本访问Redis,保证操作的原子性 18 */ 19 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { 20 internalLockLeaseTime = unit.toMillis(leaseTime); 21 22 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, 23 "if (redis.call('exists', KEYS[1]) == 0) then " + 24 "redis.call('hset', KEYS[1], ARGV[2], 1); " + 25 "redis.call('pexpire', KEYS[1], ARGV[1]); " + 26 "return nil; " + 27 "end; " + 28 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + 29 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 30 "redis.call('pexpire', KEYS[1], ARGV[1]); " + 31 "return nil; " + 32 "end; " + 33 "return redis.call('pttl', KEYS[1]);", 34 Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); 35 }
那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。
接口的参数和含义上面已经说过了,现在我们开看下源码,这里只显示一些重要逻辑。
1 @Override 2 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { 3 long time = unit.toMillis(waitTime); 4 long current = System.currentTimeMillis(); 5 long threadId = Thread.currentThread().getId(); 6 Long ttl = tryAcquire(leaseTime, unit, threadId); 7 //1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走 8 if (ttl == null) { 9 return true; 10 } 11 //2、如果超过了尝试获取锁的等待时间,当然返回false 了。 12 time -= System.currentTimeMillis() - current; 13 if (time <= 0) { 14 acquireFailed(threadId); 15 return false; 16 } 17 18 // 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。 19 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); 20 // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。 21 // 只有await返回true,才进入循环尝试获取锁 22 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { 23 if (!subscribeFuture.cancel(false)) { 24 subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { 25 @Override 26 public void operationComplete(Future<RedissonLockEntry> future) throws Exception { 27 if (subscribeFuture.isSuccess()) { 28 unsubscribe(subscribeFuture, threadId); 29 } 30 } 31 }); 32 } 33 acquireFailed(threadId); 34 return false; 35 } 36 37 //4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果 38 //1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。 39 while (true) { 40 long currentTime = System.currentTimeMillis(); 41 ttl = tryAcquire(leaseTime, unit, threadId); 42 // 获取锁成功 43 if (ttl == null) { 44 return true; 45 } 46 // 获取锁失败 47 time -= System.currentTimeMillis() - currentTime; 48 if (time <= 0) { 49 acquireFailed(threadId); 50 return false; 51 } 52 } 53 }
[](javascript:void(0)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。