赞
踩
我相信很多人学分布式锁最大的动力并不是他自己的系统需要,而是面试官需要。。。当然,这也侧面说明分布锁很重要,经常作为考题,在学习之前,我们要先明确几个问题。
当然重要,只要访问临界资源的时候,都会用到锁,要不然就会有线程安全问题。
这里需要的明确一个问题,这些Java自带的锁都是在同一个JVM中发挥作用的,如果是在分布式的服务中,会有多个JVM虚拟机下的服务进行并发访问,这些锁是发挥不了作用的。分布式环境下,锁需要第三方服务提供。
其实说白了,只要能存储数据的数据库都能实现分布锁,因为我们只需要告诉其它线程,当前资源被占用了就可以了,其实synchronized和Lock也是存储一个标记,告知别的线程,当前资源有没有被占用而已,没什么神秘的。
首先我们需要明白,分布式锁应该达到什么样的要求:
接下来简单介绍下三种锁实现原理,以及他们的优缺点。
基于MySQL的锁主要是两种实现方式,一种是悲观锁,一种是乐观锁。
Redis的一些操作和特性,可能参考这篇文章:Redis学习笔记。这里直接介绍需要用到的三个命令,分别是:SETNX、expire、delete。
实现思想为:
ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。Znode 又分为四种类型:持久节点、持久顺序节点、临时节点、临时顺序节点。
ZooKeeper 分布式锁是基于临时顺序节点来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
而且用临时顺序节点的另外一个用意是,如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
从实现容易度上比较:MySQL数据库 > Zookeeper > Redis缓存。
从性能上比较:Redis缓存 > Zookeeper > MySQL数据库。
从可靠性上比较:Zookeeper > MySQL数据库 > Redis缓存。
用Redis做代码实现时,我们需要考虑以下几个问题:
那么这些实现起来太麻烦怎么办,直接用别人封装好的库即可:Redisson。不但更加方便,也更加稳定,代码如下:
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例
RLock rLock = redissonClient.getLock(lockKey);
try {
// 4.尝试获取锁
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
// 获得锁成功,处理业务
}
} catch (Exception e) {
// 继续等待,或执行别的操作
throw new RuntimeException("aquire lock fail");
}finally{
// 无论如何, 最后都要解锁
rLock.unlock();
}
Redisson的实现方法如下:
tryLock实现方法如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
/**
* 订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
* 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
* 当 this.await 返回 true,进入循环尝试获取锁.
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
/**
* 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
* 获取锁成功,则立马返回 true,
* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
*/
while (true) {
long currentTime = System.currentTimeMillis();
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 超过最大等待时间则返回 false 结束循环,获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
/**
* 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
*/
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
//如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
//则就在wait time 时间范围内等待可以通过信号量
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 无论是否获得锁,都要取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
}
return get(tryLockAsync(waitTime, leaseTime, unit));
}
Redisson看门狗续锁机制
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
续期原理
就是用lua脚本,将锁的时间重置为30s
/*
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,
然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 还持有锁 key
(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),
那么就会不断的延长锁 key 的生存时间。如果服务宕机了,Watch Dog 机制线程也就没有了,
此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
*/
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。