当前位置:   article > 正文

分布式锁:红锁redLock和联锁multiLock源码分析

multilock

概述

为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。

而这个分布式协调技术的核心就是来实现分布式锁

分布式锁应该具备哪些条件

  • 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
  • 具备锁失效机制,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁有很多,常用的有MySQL的、zookeeper的和Redis的
今天我就来分析分析Redis分布式锁中的红锁联锁

重点分析的红锁,联锁只是顺便;

Redis分布式锁的本质

在讲解之前,先给出,获取Redis分布式锁的本质:

Redis获取分布式锁,其实就是向NRedis实例中使用SETNX来对该resource设置键值。

解锁:就是反向操作,删除掉resource的键值;

一开始只是想研究红锁源码,但是发现红锁源码是基于联锁源码的,
所以联锁的情况一起分析了下;

应用场景

一般来说:“分布式锁”是用来解决分布式应用中“并发冲突”
我们来看个例子:

在这里插入图片描述

上面是一个股票自选股的页面,window电脑快捷键中:

insert键可以添加一只股票到自选股中

delete键可以在自选股中删除一只股票

现在有人快速的对某只股票狂按这两个快捷键:比如一秒内按了三个来回

后端接口判断逻辑很简单:

判断添加的股票是否已经在自选股中;

① 不在:添加;

② 在:不做任何操作;(实际情况会有业务操作,比如排序)

这个时候会很容易出现并发问题:自选股重复添加问题;
比如:

①添加操作请求过来时,还没有执行完;

②删除请求也来了,但是它执行完了;

③而后又有一个 添加请求来了;

④此时两个添加请求操作都判定自选股里没有这个股票,就会出现重复添加的问题;

这里只考虑依靠代码来解决并发问题,不考虑数据库唯一键的问题;
实际上,因为采用的是mongodb数据库,自选股的数据是存放在数组里,也没办法设置唯一键;

如果服务部署的是单例,只需要在代码关键地方加个锁就行了;
但是现在服务大多都是集群部署;有多个节点;这个时候就需要分布式锁来解决问题;

前置知识

这里先了解一个概念;

租约时间(leaseTime): Rediskey的过期时间;

等待时间(waitTime): 请求获取锁的等待时间;

什么是红锁?

红锁算法思想
为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

官方地址:http://redis.cn/topics/distlock.html

什么是联锁?

联锁的思想很简单:N个实例都必须获取到锁,有一个失败,即为失败。

分析源码

Java11

Redisson: 3.1.3.6

为了方便分析代码,假设集群里三个Redis实例;

起始调用代码:

long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock(leaseTime, TimeUnit.SECONDS);
  • 1
  • 2
  • 3
  • 4
  • 5

这里是以redLock.lock(leaseTime, TimeUnit.SECONDS)方法为入口,开始分析的;

redLock.lock(leaseTime, TimeUnit.SECONDS)

源码:

/**
 * leaseTime租约时间,也就是键key的过期时间
 */
public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

接着会调用lockInterruptibly()方法:

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
	// 以毫秒来算的话,1500==1.5s * 3 = 4.5s
	// 默认等待时间为4.5s
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
    	// 当没有设置租约时间时,waitTime 等于默认4.5s
        waitTime = baseWaitTime;
    } else {
    	// 设置了租约时间的情况
    	// 将前端传入的租约时间转为毫秒
        leaseTime = unit.toMillis(leaseTime);
        // 租约时间赋值给waitTime
        waitTime = leaseTime;
        if (waitTime <= 2000) {
        	// 小于 2s情况
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
        	// ThreadLocalRandom.current() 多线程下生成随机数
        	// 假设传入3s,那么就会在(3/2=1s, 3s)之间产生随机数赋值给waitTime
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            // 假设传入6s,那么就会在(4.5s, 6s)之间产生随机数赋值给waitTime
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    // 假设leaseTime传入的是3s, 根据随机策略(1s, 3s),假设生成随机数是2s
    while (true) {
    	// waitTime = 2s, leaseTime = 3s
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
        	// 可以看出lock方法只能成功,才会生成退出。否则就得报异常退出
            return;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

通过上面分析,可以知道lockInterruptibly()方法主要做的事情:

① 生成一个小于leaseTime的随机数赋值给waitTime

② 如果没有设置leaseTime,那么就用默认值赋值给waitTime;

③这个方法只有在外界不传入waitTime参数时,才会被调用;

Q:这里不由得思考下,为什么这里采用了随机数?
该代码块在lock()方法被调用的情况下就会执行,而该方法是没有waitTime参数的,只有leaseTime参数;

A: 最直观的 解释是因为tryLock需要这个参数;
leaseTime:这个字段是租约时间,也就是键key的过期时间。
为了保证获取到锁时,键key没过期,waitTime的值无论怎么生成都是小于leaseTime值的;
又因为前端没办法传入waitTime,通常策略就是随机生成指定范围的数;

接着我们分析:tryLock(waitTime, leaseTime, unit) 源码:

// waitTime = 2s, leaseTime = 3s, 时间单位毫秒
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    // 设置了租约时间的情况
    // 这段逻辑 就是对有租约时间的情况下,设置newLeaseTime,实际获取分布式锁时用到
    if (leaseTime != -1) {
        if (waitTime == -1) {
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
        	// 设置了等待时间的场景
        	// 根据我假设的情况,这里2 * 2 = 4s;
        	// 这里乘以2,个人认为,从申请获取锁到真正获取到锁是有时间消耗的,
        	// 为了防止获取到的锁不至于立马过期,所以乘以2,其实我觉得leaseTime*2也可以;
        	// 因为从代码最后释放锁的逻辑来看,这里的租约时间多长,并不会影响最后锁的统一释放
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
    }
    // 当前时间 毫秒
    long time = System.currentTimeMillis();
    // 总的等待时间
    long remainTime = -1;
    if (waitTime != -1) {
    	// 因为waitTime=2s,所以remainTime=2s
        remainTime = unit.toMillis(waitTime);
    }
    // 这里红锁代码重写了calcLockWaitTime()方法
    // 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
    // 计算每个锁的等待时间  在联锁的场景下,就等于remainTime
    long lockWaitTime = calcLockWaitTime(remainTime);
    // 允许获取锁失败的次数 在联锁的场景下,固定为0
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    // 循环每个redis客户端,去获取锁
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
            	// 红锁的情况中,根据计算规则肯定是取lockWaitTime
            	// lockWaitTime = 1s, remainTime=2s 取最小值 1s
                long awaitTime = Math.min(lockWaitTime, remainTime);
                // awaitTime=1s, newLeaseTime=4s 开始尝试获取锁
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
        	// 如果获取到了,就记录起来
            acquiredLocks.add(lock);
        } else {
        	// 失败的话,比较下是否到了失败次数
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
				// 3 - 2 也就是说 只有在成功两个,失败一个情况下,才会执行这里
                // 换句话说,红锁的场景下走这里,联锁场景下一定不执行这里
                break;
            }
            
            // 重试机制
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                // 重置失败次数、锁列表、遍历游标,这说明要进行重试了
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
            	// 红锁失败场景执行,因为联锁的场景中failedLocksLimit=0
                failedLocksLimit--;
            }
        }
        // 超时控制代码块
        if (remainTime != -1) {
        	// System.currentTimeMillis() - time 单个Redis实例获取锁花费的时间
         	// remainTime = remainTime - System.currentTimeMillis() - time;
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
            	// 从这段逻辑可以看出 remainTime 就是总的等待时间,如果超过了,还没有走出循环,说明获取锁失败
            	// 对已经获取到的锁进行释放
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
	
	// leaseTime = 3s
	// 下面逻辑是,key到了过期时间后,Redis利用异步线程进行删除,释放锁;
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        // 设置好过期时间
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        // 同步可中断的方式来释放到期锁
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    
    return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

代码具体干的事情,已有注释;
我们从中知道,实际获取锁的方法是lock.tryLock(...),在调用该方法之前,大体的流程图如下:

在这里插入图片描述

从流程图中我们知道:

① 目的就是为了计算出三个变量值:newLeaseTimeawaitTimefailedLocksLimit
前两个是用于实际获取锁的方法参数,最后一个是为了控制重试机制;

② 通过源码分析,联锁也是上面的流程,唯一的区别就是图中标红的部分;也就是在获取awaitTimefailedLocksLimit这两个参数的逻辑上;

这种区别Redis源码是利用两个重载方法来实现的;
(这两个重载的方法:calcLockWaitTime(...)failedLocksLimit(...)

红锁情况下,重写的方法 calcLockWaitTime(...)

// 红锁重写
// 根据我的假设,此时remainTime=2s,locks.size()=3
@Override
protected long calcLockWaitTime(long remainTime) {
	// Math.max(2 / 3, 1) = (0, 1) = 1
	// 说明:按照下面算法,应该是每个实例等待时间和1进行比较,取最大值
    return Math.max(remainTime / locks.size(), 1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

联锁 直接返回remainTime,也就是传入什么就返回什么;

红锁情况下,重写的方法 failedLocksLimit

// 锁可以失败的次数,锁的数量-锁成功客户端最小的数量
protected int failedLocksLimit() {
    return locks.size() - minLocksAmount(locks);
}
  • 1
  • 2
  • 3
  • 4

联锁是直接返回0

③ 重试机制,当在遍历Redis实例获取锁失败的情况后,如果没有超时(即<waitTime),那么Redis会自动进行重试,再次来一遍,尝试获取锁。

④ 如果到了超时时间,那么是直接return false结束掉;

⑤ 如果获取锁成功,并设置了leaseTime那么Redis会自动释放锁;

大体的流程如下:

在这里插入图片描述

实际请求锁的关键代码分析

// 假设waitTime=1s, leaseTime=3s,单位毫秒
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
	// 设置了租约时间的情况
    if (leaseTime != -1) {
    	// tryLockInnerAsync 真正加锁的方法,其会调用脚本执行redis命令插入key
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 没有设置租约时间的情况
    // 程序(看门狗)会设置一个默认值为30s的租约时间
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired  获取锁成功
        if (ttlRemaining == null) {
        	// 对该设置时间轮询器,也可以理解为监听器
        	// 该监听器会以租约时间的三分之一的频率,不断延迟租约时间
        	// 目的是为了防止,业务程序还没有跑完,锁就被释放掉了。
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

scheduleExpirationRenewal(threadId)的源码:

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
    	// 说明该线程已经获取到锁了,
    	// 内部变量值会加1,即:可重入锁
        oldEntry.addThreadId(threadId);
    } else {
    	// 该线程第一次获取到该锁
        entry.addThreadId(threadId);
        // 启动时间轮询器,开始每隔leaseTime/3的时间,不断去延长租约时间
        renewExpiration();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

renewExpiration()的源码如下:

private void renewExpiration() {
   ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
   if (ee == null) {
       return;
   }
   // newTimeout 底层使用的是netty
   // 
   Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
       @Override
       public void run(Timeout timeout) throws Exception {
           ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
           if (ent == null) {
               return;
           }
           Long threadId = ent.getFirstThreadId();
           if (threadId == null) {
               return;
           }
           
           RFuture<Boolean> future = renewExpirationAsync(threadId);
           future.onComplete((res, e) -> {
               if (e != null) {
               		// 有异常,结束
                   log.error("Can't update lock " + getName() + " expiration", e);
                   return;
               }
               
               if (res) {
                   // reschedule itself 
                   // 递归调用
                   renewExpiration();
               }
           });
       }
       // 定时器的时间为租约时间的三分之一
   }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
   
   ee.setTimeout(task);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

大体的流程如下:

在这里插入图片描述

从上面的代码分析我们知道:

① 为了防止业务程序还没有跑完,锁就被释放掉了,Redisson底层利用netty实现了一个时间轮询器, 触发的频率为租约时间的三分之一,
来给没有设置租约时间的场景进行自动延长租约时间;

② 红锁和联锁都是可重入锁;

加锁和解锁的Redis命令

@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    						// 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),
    						// 并通过pexpire设置失效时间(也是锁的租约时间)
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                              "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                              "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                              "redis.call('set', KEYS[2] .. ':1', 1); " +
                              "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                              "return nil; " +
                            "end; " +
                            // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
                            "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                              "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                              "local key = KEYS[2] .. ':' .. ind;" +
                              "redis.call('set', key, 1); " +
                              "redis.call('pexpire', key, ARGV[1]); " +
                              "local remainTime = redis.call('pttl', KEYS[1]); " +
                              "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                              "return nil; " +
                            "end;" +
                            // 获取分布式锁的KEY的失效时间毫秒数
                            "return redis.call('pttl', KEYS[1]);",
                            // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                    Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                    internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

获取锁的命令中,

KEYS[1]就是Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)),表示分布式锁的key

这里个getName()方法返回的是name,这个name是构造RLock lock = redissonClient.getLock(name);传进去的。

ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s

ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值;

释放锁:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 释放锁时需要在redis实例上执行的lua命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

这里是参考:Redlock:Redis分布式锁最牛逼的实现


自此大部分的代码都分析完了;

下面自己又把其他几个类似的方法,一起人肉分析了一遍;

为了不重复贴代码,只贴关键的部分

redLock.lock()

起始调用代码:

long leaseTime = 3L;
lock.lock(leaseTime, TimeUnit.SECONDS);
RedissonRedLock redLock = new RedissonRedLock(lock);
// 有参数的情况
redLock.lock();
  • 1
  • 2
  • 3
  • 4
  • 5

源码:

// 租约时间为 -1  相当于没有设置
@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}
  • 1
  • 2
  • 3
  • 4
  • 5

只贴关键代码:
接着这里会调用lockInterruptibly()方法,计算waitTime后,来调用tryLock(long waitTime, long leaseTime, TimeUnit unit)

此时:

waitTime=4.5s leaseTime = -1
  • 1
...
long remainTime = -1;
if (waitTime != -1) {
	// remainTime = waitTime = 4.5s
    remainTime = unit.toMillis(waitTime);
}
// 这里红锁代码重写了calcLockWaitTime()方法
// 根据下面代码calcLockWaitTime分析,此时lockWaitTime=1s;
// 计算每个锁的等待时间 (4.5/3 = 1, 1)取最大,=1
long lockWaitTime = calcLockWaitTime(remainTime);
// 允许获取锁的失败次数=1
int failedLocksLimit = failedLocksLimit();
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

实际获取锁代码块:

...
// min(1, 4.5) = 1
// 在redLock场景中lockWaitTime永远都会比remainTime值小
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=1, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
...
// 无参的情况下leaseTime=-1,也就说无参的情况下,必须手动释放锁;redis不会自动释放
if (leaseTime != -1) {
    List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
    for (RLock rLock : acquiredLocks) {
        RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
        futures.add(future);
    }
    
    for (RFuture<Boolean> rFuture : futures) {
        rFuture.syncUninterruptibly();
    }
}
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

小结:redLock.lock() 需要手动释放锁;

联锁

红锁的代码就是基于联锁实现的,只是下面两个重写的方法实现不同而已:

calcLockWaitTime()
failedLocksLimit()
  • 1
  • 2

起始调用代码:

long leaseTime = 3L;
RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock(leaseTime, TimeUnit.SECONDS);
  • 1
  • 2
  • 3

联锁multiLock.lock(leaseTime, TimeUnit.SECONDS)

源码分析:

public void lock(long leaseTime, TimeUnit unit) {
    try {
        lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// leaseTime=3s 
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 默认等待时间
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
    	// waitTime = 3 * 1.5 = 4.5s
        waitTime = baseWaitTime;
    } else {
        leaseTime = unit.toMillis(leaseTime);
        // 3s
        waitTime = leaseTime;
        if (waitTime <= 2000) {
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
        	// 3 <= 4.5 
        	// 3/2=1, 3  假设随机数为2
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    
    while (true) {
    	// waitTime 2s,leaseTime=3s
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

可以看出,上面这段代码和红锁是一模一样的逻辑:

此时:
waitTime=2s ,leaseTime=3s

只贴主要代码:

long remainTime = -1;
if (waitTime != -1) {
	// 2s
    remainTime = unit.toMillis(waitTime);
}
// 联锁的场景下,就是remainTime=2s
long lockWaitTime = calcLockWaitTime(remainTime);
// 联锁的场景下,固定为0
int failedLocksLimit = failedLocksLimit();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

实际请求锁的代码块:

...
// 联锁的场景下,lockWaitTime和remainTime是相等的;
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 2s,newLeaseTime=4s
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

假设失败了:

if (failedLocksLimit == 0) {
	// 失败次数用完了
    unlockInner(acquiredLocks);
    if (waitTime == -1) {
        return false;
    }
    // 重置失败次数、锁列表、遍历游标,这说明要进行重试了
    failedLocksLimit = failedLocksLimit();
    acquiredLocks.clear();
    // reset iterator
    while (iterator.hasPrevious()) {
        iterator.previous();
    }
}
// 时间控制   可以看出上面的逻辑存在重试机制,所以才有下面的超时判断逻辑
if (remainTime != -1) {
    remainTime -= System.currentTimeMillis() - time;
    time = System.currentTimeMillis();
    if (remainTime <= 0) {
        unlockInner(acquiredLocks);
        return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

小结:联锁拥有重试机制,multiLock.lock(leaseTime, TimeUnit.SECONDS)能自动释放锁;

联锁multiLock.lock()

起始调用代码:

RedissonMultiLock multiLock = new RedissonMultiLock(lock);
multiLock.lock();
  • 1
  • 2

在执行完随机生成waitTime逻辑之后;
此时:

waitTime=4.5s  leaseTime=-1
  • 1

贴主要代码:

long remainTime = -1;
if (waitTime != -1) {
	// remainTime = waitTime=4.5s
    remainTime = unit.toMillis(waitTime);
}
// lockWaitTime = remainTime = 4.5s
long lockWaitTime = calcLockWaitTime(remainTime);
// 固定为0
int failedLocksLimit = failedLocksLimit();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

实际执行的代码块:

...
// lockWaitTime = remainTime = 4.5s
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime=4.5s  newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

剩下的情况和有参是一样的;

小结:联锁拥有重试机制,multiLock.lock()需要手动释放锁;

redLock.tryLock()

起始调用代码:

RedissonRedLock redLock = new RedissonRedLock(lock);
redLock.tryLock();
  • 1
  • 2
public boolean tryLock() {
   try {
       return tryLock(-1, -1, null);
   } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       return false;
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以看出,这里是没有调用随机生成waitTime的逻辑,这里它全部都设置为了-1

此时:

waitTime=-1s  leaseTime=-1
  • 1

贴主要代码:

...
// 红锁 Max(-1/3, 1) 为1
long lockWaitTime = calcLockWaitTime(remainTime);
// 红锁 3 - 3/2+1 = 1
int failedLocksLimit = failedLocksLimit();
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

实际获取锁的代码块:

...
if (waitTime == -1 && leaseTime == -1) {
    // 逻辑会走这里 尝试获取锁
    lockAcquired = lock.tryLock();
}
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我们自此可以知道redLock.tryLock();方法:
① 是需要手动释放锁的

redLock.tryLock(long waitTime, TimeUnit unit)

只贴主要代码:

tryLock(waitTime, -1, unit)
  • 1

即:
waitTime:传入值,假设为7s
leaseTime:-1

...
long remainTime = -1;
if (waitTime != -1) {
	// remainTime = 7s
    remainTime = unit.toMillis(waitTime);
}
//max(7/3=2, 1)=2
long lockWaitTime = calcLockWaitTime(remainTime);
// 1
int failedLocksLimit = failedLocksLimit();
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

实际请求锁的执行代码块:

...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = 2s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
  • 1
  • 2
  • 3
  • 4
  • 5

小节: 需要手动释放锁。

联锁multiLock.tryLock(waitTime, TimeUnit.SECONDS)

只贴主要代码:

public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);
}
  • 1
  • 2
  • 3

即:
waitTime:前端传入值;假设为7s;
leaseTime: -1

// lockWaitTime = remainTime = waitTime =7s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
  • 1
  • 2
  • 3
  • 4

实际获取锁的代码块:

...
long awaitTime = Math.min(lockWaitTime, remainTime);
// awaitTime = lockWaitTime = remainTime = 7s, newLeaseTime=-1
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
...
  • 1
  • 2
  • 3
  • 4
  • 5

小节:需要手动释放锁;

联锁multiLock.tryLock()

只贴出关键代码:

tryLock(-1, -1, null);
  • 1

即:waitTime=-1,leaseTime=-1

受影响的变量:

// lockWaitTime = remainTime = waitTime =-1s
long lockWaitTime = calcLockWaitTime(remainTime);
// 0
int failedLocksLimit = failedLocksLimit();
  • 1
  • 2
  • 3
  • 4

实际请求锁代码块:

...
if (waitTime == -1 && leaseTime == -1) {
    lockAcquired = lock.tryLock();
}
...
  • 1
  • 2
  • 3
  • 4
  • 5

小节:需要手动释放锁

总结

相同点

  • 源码中的waitTime在红锁和联锁的场景下是一定有值,无参的情况下,单个实例默认时间是1500毫秒;
  • 红锁和联锁lock()有参的情况下,Redis会自动释放锁,而无参情况下,需要手动释放锁;
  • 无论是红锁场景还是联锁场景,tryLock()方法都需要手动释放锁;
  • 红锁和联锁tryLock()方法都不会执行重试机制;
  • 红锁和联锁都有重试机制,只要没有超时(失败后,剩余时间依然小于waitTime,并且没有超过允许的失败次数),那么就会进行重试;(除tryLock()方法)
  • 红锁和联锁都是可重入锁;
  • 红锁和联锁无论哪个方法,在没有设置leaseTime(不等于-1)的情况下,当业务程序没有跑完时,都会延长租约时间:默认值30s;

不同点

  • 联锁要求获取到所有的锁,而红锁只要求获取大部分锁,即过半就可以。
  • 联锁实际请求锁的等待时间,一定大于红锁实际请求锁的等待时间

红锁和联锁之所以会有上面的区别,因为在红锁算法思想:快速尝试获取锁,要是获取不到,就去获取下一个;只要获取的锁的数量大于一半即可。

方法是否需要手动释放锁是否有重试机制是否可重入看门狗是否会延长租约时间
redLock.lock()需要
redLock.lock(leaseTime, TimeUnit.SECONDS)不需要不会 (leaseTime=-1 会)
multiLock.lock()需要
multiLock.lock(leaseTime, TimeUnit.SECONDS)不需要不会 (leaseTime=-1 会)
redLock.tryLock()需要
redLock.tryLock(long waitTime, TimeUnit unit)需要
multiLock.tryLock(waitTime, TimeUnit.SECONDS)需要
multiLock.tryLock()需要

参考地址:

Redis分布式锁的租约续租实现

分布式锁和同步器

http://redis.cn/topics/distlock.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/200927
推荐阅读
  

闽ICP备14008679号