赞
踩
分布式锁:满足分布式系统或集群模式下多线程课件并且可以互斥的锁
分布式锁的核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
那么分布式锁应该满足一些什么条件呢?
可见性:多个线程都能看到相同的结果。
注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
常见的分布式锁有三种
实现分布式锁时需要实现两个基本方法
SET lock thread01 NX EX 10
DEL lock
核心思路
我们利用redis的SETNX方法,当有多个线程进入时,我们就利用该方法来获取锁。第一个线程进入时,redis 中就有这个key了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁(返回了0)的线程,等待一定时间之后重试
public interface ILock {
/**
* 尝试获取锁
*
* @param timeoutSec 锁持有的超时时间,过期自动释放
* @return true表示获取锁成功,false表示获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
public class SimpleRedisLock implements ILock {
//锁的前缀
private static final String KEY_PREFIX = "lock:";
//具体业务名称,将前缀和业务名拼接之后当做Key
private String name;
//这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
//自动拆箱可能会出现null,这样写更稳妥
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//通过DEL来删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁对象
boolean isLock = redisLock.tryLock(120);
// 加锁失败,说明当前用户开了多个线程抢优惠券,但是由于key是SETNX的,所以不能创建key,得等key的TTL到期或释放锁(删除key)
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
redisLock.unlock();
}
}
在nginx负载均衡分布式服务中
使用Jmeter进行压力测试,请求头中携带登录用户的token,最终只能抢到一张优惠券
逻辑说明
解决方案
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取当前线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if (threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
Redis提供的调用函数语法如下
redis.call('命令名称','key','其他参数', ...)
例如我们要执行set name Kyle,则脚本是这样
redis.call('set', 'name', 'Kyle')
例如我我们要执行set name David,在执行get name,则脚本如下
## 先执行set name David
redis.call('set', 'name', 'David')
## 再执行get name
local name = redis.call('get', 'name')
## 返回
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下
EVAL script numkeys key [key ...] arg [arg ...]
例如,我们要调用redis.call(‘set’, ‘name’, ‘Kyle’) 0这个脚本,语法如下
EVAL "return redis.call('set', 'name', 'Kyle')" 0
如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数
注意:在Lua中,数组下标从1开始
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Lucy
那现在我们来使用Lua脚本来代替我们释放锁的逻辑
@Override
public void unlock() {
// 获取当前线程的标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标识是否一致
if (threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
-- 线程标识
local threadId = "UUID-31"
-- 锁的key
local key = "lock:order:userId"
-- 获取锁中线程标识
local id = redis.call('get', key)
-- 比较线程标识与锁的标识是否一致
if (threadId == id) then
-- 一致则释放锁 del key
return redis.call('del', key)
end
return 0
可以简写下面代码
-- 这里的KEYS[1]就是传入锁的key
-- 这里的ARGV[1]就是线程标识
-- 比较锁中的线程标识与线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then
-- 一致则释放锁
return redis.call('del', KEYS[1])
end
return 0
在RedisTemplate中,可以利用execute方法去执行lua脚本
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return this.scriptExecutor.execute(script, keys, args);
}
对应的Java代码如下
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
Redis提供了分布式锁的多种多样功能
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://101.XXX.XXX.160:6379")
.setPassword("root");
return Redisson.create(config);
}
}
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
//获取可重入锁
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
//判断获取锁成功
if (success) {
try {
System.out.println("执行业务");
} finally {
//释放锁
lock.unlock();
}
}
}
+ @Resource
+ private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
- SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
+ RLock redisLock = redissonClient.getLock("order:" + userId);
- boolean isLock = redisLock.tryLock(120);
+ boolean isLock = redisLock.tryLock();
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
}
修改后
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
LambdaQueryWrapper<SeckillVoucher> queryWrapper = new LambdaQueryWrapper<>();
//1. 查询优惠券
queryWrapper.eq(SeckillVoucher::getVoucherId, voucherId);
SeckillVoucher seckillVoucher = seckillVoucherService.getOne(queryWrapper);
//2. 判断秒杀时间是否开始
if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
return Result.fail("秒杀还未开始,请耐心等待");
}
//3. 判断秒杀时间是否结束
if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
return Result.fail("秒杀已经结束!");
}
//4. 判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}
Long userId = UserHolder.getUser().getId();
RLock redisLock = redissonClient.getLock("order:" + userId);
boolean isLock = redisLock.tryLock();
if (!isLock) {
return Result.fail("不允许抢多张优惠券");
}
try {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
redisLock.unlock();
}
}
使用Jmeter进行压力测试,依旧是只能抢到一张优惠券,满足我们的需求
在Lock锁中,他是借助于等曾的一个voaltile的一个state变量来记录重入的状态的
在redisson中,我们也支持可重入锁
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("lock");
}
@Test
void method1() {
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}
void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}
所以我们需要额外判断,method1和method2是否处于同一线程,如果是同一个线程,则可以拿到锁,但是state会+1,之后执行method2中的方法,释放锁,释放锁的时候也只是将state进行-1,只有减至0,才会真正释放锁
由于我们需要额外存储一个state,所以用字符串型SET NX EX是不行的,需要用到Hash结构,但是Hash结构又没有NX这种方法,所以我们需要将原有的逻辑拆开,进行手动判断
为了保证原子性,所以流程图中的业务逻辑也是需要我们用Lua来实现的
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 锁不存在
if (redis.call('exists', key) == 0) then
-- 获取锁并添加线程标识,state设为1
redis.call('hset', key, threadId, '1');
-- 设置锁有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁存在,判断threadId是否为自己
if (redis.call('hexists', key, threadId) == 1) then
-- 锁存在,重入次数 +1,这里用的是hash结构的incrby增长
redis.call('hincrby', key, thread, 1);
-- 设置锁的有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- 如果锁不是自己的
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 直接返回
end;
-- 锁是自己的,锁计数-1,还是用hincrby,不过自增长的值为-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数为多少
if (count > 0) then
-- 大于0,重置有效期
redis.call('expire', key, releaseTime);
return nil;
else
-- 否则直接释放锁
redis.call('del', key);
return nil;
end;
获取锁源码
查看源码,跟我们的实现方式几乎一致
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
释放锁源码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}
前面我们分析的是空参的tryLock方法,现在我们来分析一下这个带参数的
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
源码分析
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 如果没有指定释放时间时间,则指定默认释放时间为getLockWatchdogTimeout,底层源码显示是30*1000ms,也就是30秒
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
//判断ttl是否为null
if (ttl == null) {
return true;
} else {
//计算当前时间与获取锁时间的差值,让等待时间减去这个值
time -= System.currentTimeMillis() - current;
//如果消耗时间太长了,直接返回false,获取锁失败
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
//等待时间还有剩余,再次获取当前时间
current = System.currentTimeMillis();
//订阅别人释放锁的信号
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
//在剩余时间内,等待这个信号
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
//取消订阅
this.unsubscribe(subscribeFuture, threadId);
}
});
}
//剩余时间内没等到,返回false
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
try {
//如果剩余时间内等到了别人释放锁的信号,再次计算当前剩余最大等待时间
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
//如果剩余时间为负数,则直接返回false
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
//如果剩余时间等到了,dowhile循环重试获取锁
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//不存在,才put,表明是第一次进入,不是重入
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
//如果是第一次进入,则跟新有效期
entry.addThreadId(threadId);
this.renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//Timeout是一个定时任务
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//重置有效期
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
} else {
if (res) {
//然后调用自己,递归重置有效期
RedissonLock.this.renewExpiration();
}
}
});
}
}
}
//internalLockLeaseTime是之前WatchDog默认有效期30秒,那这里就是 30 / 3 = 10秒之后,才会执行
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
那么之前的重置有效期的行为该怎么终止呢?当然是释放锁的时候会终止
cancelExpirationRenewal
void cancelExpirationRenewal(Long threadId) {
//将之前的线程终止掉
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
//获取之前的定时任务
Timeout timeout = task.getTimeout();
if (timeout != null) {
//取消
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了
哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题
为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性
我们先使用虚拟机额外搭建两个Redis节点
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.137.130:6379")
.setPassword("root");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.131:6379")
.setPassword("root");
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.132:6379")
.setPassword("root");
return Redisson.create(config);
}
}
使用联锁,我们首先要注入三个RedissonClient对象
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
private RLock lock;
@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}
@Test
void method1() {
boolean success = lock.tryLock();
redissonClient.getMultiLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}
void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}
源码分析
我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
} else {
this.locks.addAll(Arrays.asList(locks));
}
}
联锁的tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1L;
//如果传入了释放时间
if (leaseTime != -1L) {
//再判断一下是否有等待时间
if (waitTime == -1L) {
//如果没传等待时间,不重试,则只获得一次
newLeaseTime = unit.toMillis(leaseTime);
} else {
//想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二
newLeaseTime = unit.toMillis(waitTime) * 2L;
}
}
//获取当前时间
long time = System.currentTimeMillis();
//剩余等待时间
long remainTime = -1L;
if (waitTime != -1L) {
remainTime = unit.toMillis(waitTime);
}
//锁等待时间,与剩余等待时间一样
long lockWaitTime = this.calcLockWaitTime(remainTime);
//锁失败的限制,源码返回是的0
int failedLocksLimit = this.failedLocksLimit();
//已经获取成功的锁
List<RLock> acquiredLocks = new ArrayList(this.locks.size());
//迭代器,用于遍历
ListIterator<RLock> iterator = this.locks.listIterator();
while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();
boolean lockAcquired;
try {
//没有等待时间和释放时间,调用空参的tryLock
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
//否则调用带参的tryLock
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException var21) {
this.unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception var22) {
lockAcquired = false;
}
//判断获取锁是否成功
if (lockAcquired) {
//成功则将锁放入成功锁的集合
acquiredLocks.add(lock);
} else {
//如果获取锁失败
//判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环
if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
break;
}
//否则将拿到的锁都释放掉
if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
//如果等待时间为-1,则不想重试,直接返回false
if (waitTime == -1L) {
return false;
}
failedLocksLimit = this.failedLocksLimit();
//将已经拿到的锁都清空
acquiredLocks.clear();
//将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
--failedLocksLimit;
}
}
//如果剩余时间不为-1,很充足
if (remainTime != -1L) {
//计算现在剩余时间
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
//如果剩余时间为负数,则获取锁超时了
if (remainTime <= 0L) {
//将之前已经获取到的锁释放掉,并返回false
this.unlockInner(acquiredLocks);
//联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败
return false;
}
}
}
//如果设置了锁的有效期
if (leaseTime != -1L) {
List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
//迭代器用于遍历已经获取成功的锁
Iterator var24 = acquiredLocks.iterator();
while(var24.hasNext()) {
RLock rLock = (RLock)var24.next();
//设置每一把锁的有效期
RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
var24 = futures.iterator();
while(var24.hasNext()) {
RFuture<Boolean> rFuture = (RFuture)var24.next();
rFuture.syncUninterruptibly();
}
}
//但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期
return true;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。