赞
踩
MutiLock 说明东西?就是可以将多个锁合并成一个大锁,对一个大锁进行统一的申请和释放。其实就是一次性的去锁定多个资源,然后处理业务,最后统一释放
我们看源码之前先思考一下,这个如果基于之前的一些思想,如何去实现这个功能????难道是就是对多个所RedissonLock 依次去加锁?所有的锁加锁成功就代表MultiLock加锁成功?
我们还是先看看Redisson如何使用,也就是我们看源码的入口对把
- public static void main(String[] args) throws Exception {
- //构建一个配置信息对象
- Config config = new Config();
- config.useClusterServers()
- //定时扫描连接信息 默认1000ms
- .setScanInterval(2000)
- .addNodeAddress("redis://127.0.0.1:7001");
- //因为Redisson 是基于redis封装的一套便于复杂操作的框架
- //所以这里构建对象肯定是创建一些与redis的连接
- RedissonClient redisson = Redisson.create(config);
-
- RLock lock1 = redisson.getLock("lock1");
- RLock lock2 = redisson.getLock("lock2");
- RedissonMultiLock multiLock = new RedissonMultiLock(lock1,lock2);
-
- multiLock.lock();
- //释放锁
- multiLock.unlock();
- }
感觉就是去将多个RLock锁实例进行统一的操作管理?
我们看看实例化MultiLock传入的参数干嘛了,将其存入到了一个list 数组中,记住名字 locks
- final List<RLock> locks = new ArrayList<RLock>();
-
- public RedissonMultiLock(RLock... locks) {
- if (locks.length == 0) {
- throw new IllegalArgumentException("Lock objects are not defined");
- }
- this.locks.addAll(Arrays.asList(locks));
- }
进入到RedissonMultiLock 内部,现在基本上和之前差不多的代码
- @Override
- public void lock() {
- try {
- lockInterruptibly();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- lockInterruptibly(-1, null);
- }
- public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
- /基础等待时间 这里我们按照3个lock 来统一计算 baseWaitTime = 4500MS
- long baseWaitTime = locks.size() * 1500;
- long waitTime = -1;
- //这里还是支持一些获取锁超时机制
- //计算一些等待时间
- if (leaseTime == -1) {
- waitTime = baseWaitTime;
- unit = TimeUnit.MILLISECONDS;
- } else {
-
- waitTime = unit.toMillis(leaseTime);
- if (waitTime <= 2000) {
- waitTime = 2000;
- } else if (waitTime <= baseWaitTime) {
- waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
- } else {
- waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
- }
- waitTime = unit.convert(waitTime, TimeUnit.MILLISECONDS);
- }
-
-
- while (true) {
- //尝试获取锁 死循环,不停的尝试获取所有需要的锁,
- //只要没有全部获取到
- //或者未超时那么就会一直死循环
- if (tryLock(waitTime, leaseTime, unit)) {
- return;
- }
- }
- }
其实我们大概看了一下,MultiLock 这个东西最主要的逻辑不是lua脚本来实现,其实是一个 high level 高层次的API,基于low level 低层次的API 进行了封装
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- // try {
- // return tryLockAsync(waitTime, leaseTime, unit).get();
- // } catch (ExecutionException e) {
- // throw new IllegalStateException(e);
- // }
- long newLeaseTime = -1;
- if (leaseTime != -1) {
- newLeaseTime = unit.toMillis(waitTime)*2;
- }
-
- long time = System.currentTimeMillis();
- long remainTime = -1;
- if (waitTime != -1) {
- //设置remainTime 这个是个什么参数?猜一下
- remainTime = unit.toMillis(waitTime);
- }
- //上锁等待时间
- long lockWaitTime = calcLockWaitTime(remainTime);
-
- int failedLocksLimit = failedLocksLimit();
- //保存已经获取到的锁
- List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
- for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
- RLock lock = iterator.next();
- boolean lockAcquired;
- try {
- if (waitTime == -1 && leaseTime == -1) {
- lockAcquired = lock.tryLock();
- } else {
- long awaitTime = Math.min(lockWaitTime, remainTime);
- //这里底层获取锁
- //这里使用的还是RedissonLock.tryLock(awaitTime, newLeaseTime。。。)方法,这个方法别忘记 4500毫秒必须获取到锁,如果获取不到就标记为失败,然后会启动一个watchdog 不停的去锁对应key的 过期时间,除非你传入了一个leaseTime 时间,就不会启动一个watchdog
- lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
- }
- } catch (Exception e) {
- lockAcquired = false;
- }
-
- if (lockAcquired) {
- acquiredLocks.add(lock);
- } else {
- if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
- break;
- }
-
- if (failedLocksLimit == 0) {
- unlockInner(acquiredLocks);
- if (waitTime == -1 && leaseTime == -1) {
- return false;
- }
- failedLocksLimit = failedLocksLimit();
- acquiredLocks.clear();
- // reset iterator
- while (iterator.hasPrevious()) {
- iterator.previous();
- }
- } else {
- failedLocksLimit--;
- }
- }
- //计算剩余时间,什么时间?就是去获取大锁的允许的时间长
- //这块逻辑是意思?就是看你尝试获取多个锁的时间 是否超过了设置的时间
- //最多不能超过假设4500ms
- //超过了那么就会释放掉所有的锁,同时返回false,获取MultiLock 失败
- //然后返回false 之后,会返回重新进入该方法,再来尝试4500ms中获取到multiLock
- if (remainTime != -1) {
- remainTime -= (System.currentTimeMillis() - time);
- time = System.currentTimeMillis();
- if (remainTime <= 0) {
- unlockInner(acquiredLocks);
- return false;
- }
- }
- }
-
- if (leaseTime != -1) {
- List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
- for (RLock rLock : acquiredLocks) {
- RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
- futures.add(future);
- }
-
- for (RFuture<Boolean> rFuture : futures) {
- rFuture.syncUninterruptibly();
- }
- }
-
- return true;
- }
加锁其实到这已经结束了
那么如何释放锁?其实肯定也是调用底层原本RedissonLock 的API,最后肯定执行的是lua脚本对吧,这里我们就不详细讲了,看下源码打架都能理解
- @Override
- public void unlock() {
- List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(locks.size());
- //依次调用底层的RedissonLock方法异步释放锁
- for (RLock lock : locks) {
- futures.add(lock.unlockAsync());
- }
- //异步转同步
- for (RFuture<Void> future : futures) {
- future.syncUninterruptibly();
- }
- }
- @Override
- public RFuture<Void> unlockAsync() {
- long threadId = Thread.currentThread().getId();
- return unlockAsync(threadId);
- }
- @Override
- public RFuture<Void> unlockAsync(final long threadId) {
- final RPromise<Void> result = new RedissonPromise<Void>();
- RFuture<Boolean> future = unlockInnerAsync(threadId);
-
- future.addListener(new FutureListener<Boolean>() {
- @Override
- public void operationComplete(Future<Boolean> future) throws Exception {
- if (!future.isSuccess()) {
- cancelExpirationRenewal(threadId);
- result.tryFailure(future.cause());
- return;
- }
-
- Boolean opStatus = future.getNow();
- if (opStatus == null) {
- IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
- + id + " thread-id: " + threadId);
- result.tryFailure(cause);
- return;
- }
- if (opStatus) {
- cancelExpirationRenewal(null);
- }
- result.trySuccess(null);
- }
- });
-
- return result;
- }
-
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('exists', KEYS[1]) == 0) then " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; " +
- "end;" +
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
- "return nil;" +
- "end; " +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
- "if (counter > 0) then " +
- "redis.call('pexpire', KEYS[1], ARGV[2]); " +
- "return 0; " +
- "else " +
- "redis.call('del', KEYS[1]); " +
- "redis.call('publish', KEYS[2], ARGV[1]); " +
- "return 1; "+
- "end; " +
- "return nil;",
- Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。