当前位置:   article > 正文

Redis面试题分享三十四:使用过 Redisson 嘛?说说它的原理_redisson原理面试

redisson原理面试

目录

一、Redisson原理分析

1、加锁机制

2、watch dog自动延期机制

3、为啥要用 lua脚本呢?

4、可重入加锁机制

5、Redis分布式锁的缺点

二、RLock

1、加锁操作

 2、减锁操作

 3、锁续约

4、流程概括 

二、总结


一、Redisson原理分析

为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。
img

1、加锁机制

线程去获取锁,获取成功: 执行 lua脚本,保存数据到 redis数据库。

线程去获取锁,获取失败: 一直通过 while循环尝试获取锁,获取成功后,执行 lua脚本,保存数据到 redis数据库。

2、watch dog自动延期机制

在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

但在实际开发中会有下面一种情况:

  1. 1 //设置锁1秒过去
  2. 2 redissonLock.lock("redisson", 1);
  3. 3 /**
  4. 4 * 业务逻辑需要咨询2秒
  5. 5 */
  6. 6 redissonLock.release("redisson");
  7. 7
  8. 8 /**
  9. 9 * 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,
  10. 10 * 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
  11. 11 * 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
  12. 12 */

所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个 watch dog后台线程,不断的延长锁 key的生存时间。注意:正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。

3、为啥要用 lua脚本呢?

这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在 lua脚本中发送给redis,而且 redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性


Redis是在2.6推出了脚本功能,允许开发者使用Lua语言编写脚本传到redis执行。使用脚本的好处如下:

1、减少网络开销:本来5次网络请求的操作,可以用一个请求完成,原先5次请求的逻辑,可以一次性放到redis中执行,较少了网络往返时延。这点跟管道有点类似

2、原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。管道不是原子的,不过redis的批量操作命令(类似mset)是原子的

也就意味着虽然脚本中有多条redis指令,那即使有多条线程并发执行,在同一时刻也只有一个线程能够执行这段逻辑,等这段逻辑执行完,分布式锁也就获取到了,其它线程再进来就获取不到分布式锁了。

4、可重入加锁机制

Redisson可以实现可重入加锁机制的原因,我觉得跟两点有关:

1、Redis存储锁的数据类型是 Hash类型
2、Hash数据类型的key值包含了当前线程信息。

下面是redis存储的数据
img

这里表面数据类型是 Hash类型,Hash类型相当于我们 java的 <key,<key1,value>> 类型,这里key是指 ‘redisson’

它的有效期还有9秒,我们再来看里们的 key值为 078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:

guid + 当前线程的ID。后面的 value是就和可重入加锁有关。

举图说明

img

上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。

5、Redis分布式锁的缺点

Redis分布式锁会有个缺陷,就是在 Redis哨兵模式下:

客户端1 对某个master节点写入了 redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。

这时客户端2 来尝试加锁的时候,在新的 master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题,导致各种脏数据的产生

缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。

二、RLock

1、加锁操作

RLockRedisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。

RLock进入,找到RedissonLock类,找到 tryLock 方法再递进到干活的tryAcquireOnceAsync 方法,这是加锁的主要代码

  1. private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. if (leaseTime != -1L) {
  3. return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  4. } else {
  5. RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  6. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  7. if (e == null) {
  8. if (ttlRemaining) {
  9. this.scheduleExpirationRenewal(threadId);
  10. }
  11. }
  12. });
  13. return ttlRemainingFuture;
  14. }
  15. }

此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分,

evalWriteAsync是eval命令执行lua的入口

  1. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2. this.internalLockLeaseTime = unit.toMillis(leaseTime);
  3. return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
  4. }
 2、减锁操作

这里只有一个明显的监听方法onMessage,其订阅和信号量的释放都在父类PublishSubscribe,我们只关注监听事件的实际操作

  1. protected void onMessage(RedissonLockEntry value, Long message) {
  2. Runnable runnableToExecute;
  3. if (message.equals(unlockMessage)) {
  4. // 从监听器队列取监听线程执行监听回调
  5. runnableToExecute = (Runnable)value.getListeners().poll();
  6. if (runnableToExecute != null) {
  7. runnableToExecute.run();
  8. }
  9. // getLatch()返回的是Semaphore,信号量,此处是释放信号量
  10. // 释放信号量后会唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
  11. value.getLatch().release();
  12. } else if (message.equals(readUnlockMessage)) {
  13. while(true) {
  14. runnableToExecute = (Runnable)value.getListeners().poll();
  15. if (runnableToExecute == null) {
  16. value.getLatch().release(value.getLatch().getQueueLength());
  17. break;
  18. }
  19. runnableToExecute.run();
  20. }
  21. }
  22. }

发现一个是默认解锁消息 ,一个是读锁解锁消息 ,因为redisson是有提供读写锁的,而读写锁读读情况和读写、写写情况互斥情况不同,我们只看上面的默认解锁消息unlockMessage分支

LockPubSub监听最终执行了2件事

  • runnableToExecute.run() 执行监听回调
  • value.getLatch().release(); 释放信号量

Redisson通过LockPubSub 监听解锁消息,执行监听回调和释放信号量通知等待线程可以重新抢锁。
这时再回来看tryAcquireOnceAsync另一分支

  1. private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. if (leaseTime != -1L) {
  3. return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  4. } else {
  5. RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  6. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  7. if (e == null) {
  8. if (ttlRemaining) {
  9. this.scheduleExpirationRenewal(threadId);
  10. }
  11. }
  12. });
  13. return ttlRemainingFuture;
  14. }
  15. }
 3、锁续约

查看RedissonLock.this.scheduleExpirationRenewal(threadId)

  1. private void scheduleExpirationRenewal(long threadId) {
  2. RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
  3. RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
  4. if (oldEntry != null) {
  5. oldEntry.addThreadId(threadId);
  6. } else {
  7. entry.addThreadId(threadId);
  8. this.renewExpiration();
  9. }
  10. }
  11. private void renewExpiration() {
  12. RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
  13. if (ee != null) {
  14. Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  15. public void run(Timeout timeout) throws Exception {
  16. RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
  17. if (ent != null) {
  18. Long threadId = ent.getFirstThreadId();
  19. if (threadId != null) {
  20. RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
  21. future.onComplete((res, e) -> {
  22. if (e != null) {
  23. RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
  24. } else {
  25. if (res) {
  26. RedissonLock.this.renewExpiration();
  27. }
  28. }
  29. });
  30. }
  31. }
  32. }
  33. }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
  34. ee.setTimeout(task);
  35. }
  36. }

拆分来看,这段连续嵌套且冗长的代码实际上做了几步:

  • 添加一个nettyTimeout回调任务,每(internalLockLeaseTime / 3)毫秒执行一次,执行的方法是renewExpirationAsync
  • renewExpirationAsync重置了锁超时时间,又注册一个监听器,监听回调又执行了renewExpiration

renewExpirationAsync 的Lua如下

  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2. return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
  3. }
  4. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  5. redis.call('pexpire', KEYS[1], ARGV[1]);
  6. return 1;
  7. end;
  8. return 0;

重新设置了超时时间。
Redisson加这段逻辑的目的是什么?
目的是为了某种场景下保证业务不影响,如任务执行超时但未结束,锁已经释放的问题。
当一个线程持有了一把锁,由于并未设置超时时间leaseTimeRedisson 默认配置了30S,开启watchDog,每10S对该锁进行一次续约,维持30S的超时时间,直到任务完成再删除锁。
这就是Redisson的锁续约 ,也就是WatchDog 实现的基本思路。

4、流程概括 

通过整体的介绍,流程简单概括:

  • A、B线程争抢一把锁,A获取到后,B阻塞
  • B线程阻塞时并非主动 CAS,而是PubSub方式订阅该锁的广播消息
  • A操作完成释放了锁,B线程收到订阅消息通知
  • B被唤醒开始继续抢锁,拿到锁

详细加锁解锁流程总结如下图:

二、总结

分布式锁的整体实现很巧妙,借助lua脚本的原子性,实现了很多功能,当然redisson还有其它很多功能,比如为了解决主从集群中的异步复制会导致锁丢失问题,引入了redlock机制,还有分布式下的可重入锁等。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号