赞
踩
基于setnx实现的分布式锁存在下面的问题:
不可重入:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
**超时释放:**锁超时释放虽然可以避免死锁,但如果业务耗时较长,也会导致锁释放,存在着安全隐患。
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。概率低,同步是毫秒级.
那么什么是Redission呢?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。简单说就是redis在分布式系统上工具的集合,Redission提供了分布式锁的多种多样的功能.
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置单节点 Config config = new Config(); //多节点config.useClusterServers() config.useSingleServer().setAddress("redis://192.168.150.101:6379") .setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); } }
如何使用Redission的分布式锁
@Resource private RedissionClient redissonClient; @Test void testRedisson() throws Exception{ //获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS); //判断获取锁成功 if(isLock){ try{ System.out.println("执行业务"); }finally{ //释放锁 lock.unlock(); } } }
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource private RedissonClient redissonClient; @Override public Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁 //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); RLock lock = redissonClient.getLock("lock:order:" + userId); //获取锁对象 boolean isLock = lock.tryLock(); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ;
对于synchronized而言,他在底层语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
@Autowired private RedissonClient redissonClient; public void method1() { RLock lock = redissonClient.getLock("lock"); boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败,1"); } try { log.info("获取锁成功,1"); method2(); }finally { log.info("释放锁,1"); lock.unlock(); } } public void method2() { RLock lock = redissonClient.getLock("lock"); boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败,2"); } try { log.info("获取锁成功,2"); }finally { log.info("释放锁,2"); lock.unlock(); } }
因为setnx 无法实现可重入,所以底层使用hash结构来进行存储.原理类似Lock
key | field | Value |
---|---|---|
lock | thread_id | 1 |
锁名称 | 锁唯一标识 | 锁值 入锁加一,出锁减1.为0 删除锁 |
获取锁的Lua脚本源码
-- 判断锁是否存在 if (redis.call('exists', KEYS[1]) == 0) then -- 不存在 获取锁 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置有效器 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 锁存在,判断threadId是否属于自己 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置有效期 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);
释放锁的Lua脚本源码
local key = KEYS[1] -- 锁的key local threadId = ARGV[1] -- 线程唯一标识 local releaseTime = ARGV[2] -- 锁的自动释放时间 -- 判断锁是否还被自己持有 if (redis.call('hexists', key,threadId) == 0) then -- 不是自己的锁 直接返回 return nil; end; --是自己的锁,重入次数 -1 local count = redis.call('hincrby',key,threadId,-1); --判断重入次数是否为0 if (count > 0) then -- 大于0,说明不能释放锁,重置有效期然后返回 redis.call('expire', key,releaseTime); return nil; else redis.call("del",key); return nil; end
//源码 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { //1.将等待时间转换为毫秒数,获取当前的线程 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); //2.尝试获取锁,返回null 代表没有锁,返回有值标识锁的过期时间 Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // 3.成功获取锁 if (ttl == null) { return true; } //4.尝试获取锁耗时超过了等待时间,确认失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } current = System.currentTimeMillis(); //5.消息队列 订阅了其他线程释放锁的信号 //在unlock 脚本中 有一个 redis.call('publish',key[2],argv[1]) RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //6.当这个future 在指定时间内完成,返回true,否则false if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { //等到最大等待时间结束,还没有等到,取消订阅,返回false subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } //7.再次判断时间是否超出 try { time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } //todo 8.开始锁重试 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); //9.利用信号量来进行获取 if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally { //取消订阅 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
场景
Redisson锁重试的问题是解决了, 但是总会发生一些问题, 如果我们的业务阻塞超时了ttl到期了, 别的线程看见我们的ttl到期了, 他重试他就会拿到本该属于我们的锁, 这时候就有安全问题了, 所以该怎么解决?
我们必须确保锁是业务执行完释放的, 而不是因为阻塞而释放的
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { //自定义了时间 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //2.默认过期实践 30s 看门狗 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), //3. 当ttlRemainingFuture的异步尝试获取锁完成以后, //先判断执行过程中是否有异常, 如果有异常就直接返回了结束执行. //如果没有发生异常, 则判断ttlRemaining(剩余有效期)是否为空, //为空的话就代表获取锁成功, 执行锁到期续约的核心方法scheduleExpectationRenew TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { //更新有效期,内部源码 通过定时任务每隔10s,定时重置有效期 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
那么什么时候释放锁呢?
当然是在释放锁的时候,具体连接如下:
总结
执行流程
为了提高redis的可用性,我们会搭建集群或者主从,以主从为例,此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
WatchDog 机制 和 MutiLock原理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。