赞
踩
我们在开始这阶段的探索前,我们先来看一个例子,商品的库存扣减问题:如下图示
我们都知道,这段代码如果在并发的场景下,若不做任何的保护操作,商品的库存数据是线程不安全的,例如:当前同时有三个线程同时来扣减,假设商品总库存有300,三个线程同时纳300-1,那存库还有299,和期望的结果297不符,那就会出现超卖的情况,特别是在商品促销的场景下,商家可能拿出一部分商品来做引流的,那这一顿操作下来,商家也有可能是亏本,那如何解决呢?
我们都知道,在并发安全里,我们可以加锁,例如Synchronized或者ReentrantLock,但是,这种加锁方式只适合单机服务器下,如果当前的程序部署在不同的机器上,就不能保证线程安全了,这时,我们就引入分布式锁
在业务代码中,我们可以使用Redis的SetNx指令来设置分布式锁,如下代码:
@Autowired private StringRedisTemplate stringRedisTemplate; public String deductStock() { String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "productLockTest"); if (!result) { return "error_code"; } try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }
在上述代码中,我们尝试在Redis服务获取SetNx的值,如果结果是false,返回相关的错误码或设置相关的业务处理逻辑,但当前的线程执行完毕后,我们需要将这个key清除,避免其他线程无法加锁成功,所以在finally模块中del掉当前的可以
这样子设置分布式锁会存在问题吗?答案是肯定的,如果是当前程序在执行finally模块代码前,服务宕机了,那当前key就没删除,那启动时其他线程也无法加锁,所以这里我们加入了失效时间问题,改造后的代码如下:
@Autowired private StringRedisTemplate stringRedisTemplate; public String deductStock() { String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30,TimeUnit.SECONDS); //改造点==>添加key失效时间 if (!result) { return "error_code"; } try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { stringRedisTemplate.delete(lockKey); } return "end"; }
同样添加完时间也还是会存在问题,如果当前程序执行时间过长,当前线程设置的key时间超时,那其他线程就可以查询设置这个key了,若当前线程执行完毕之后,去del此时的key时,其实这个key也不再是当前线程添加的了,而是其线程加的,也就是说,当前线程删除了其他线程加入的key,那删除之后,剩下的线程依然可以加锁,以此类推,还是存在超卖的问题,所以还需要再次改造,改造后的代码如下:
@Autowired private StringRedisTemplate stringRedisTemplate; public String deductStock() { String lockKey = "lock:product_101"; Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30,TimeUnit.SECONDS); //添加key失效时间 String clientId = UUID.randomUUID().toString();//改造点==>添加标识 if (!result) { return "error_code"; } try { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock") if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value) System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { //stringRedisTemplate.delete(lockKey); //改造点,根据当前线程设置的标识上次key if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } return "end"; }
这种方式可以缓解一部分,但是是属于治标不治本的方案,再退一步考虑,如果当前线程执行到这个判断条件了,条件判断成立了,正在准备删除key操作时,由于你不可控因素,卡顿了,这是当前线程设置的超时时间也到了,这间隙,其他线程查询执行SetNx,设置完毕后,当前线程恢复,执行删除key操作,还是把其他线程设置的key删除 了,那如何解决呢,我们这就需要引入==>锁续命问题,看下一小结的Redisson框架,会结合其底层源码剖析这些问题的解决方案
接着前面设置分布式锁遗留的诸多问题,我们来看一个解决分布式锁的框架==>Redisson,其原理其实也是利用Redis的SetNx来加锁的(使用的lua脚本操作),其结构图示如下:
在上图中,其实是形成了一个闭环,接下来我们就来结合其源码分析其多线程获取Redis锁的一个过程是如何的,我们需要关注的点主要有:
1. 加锁的过程是如何的,加锁成功的线程与没成功的线程返回的状态具体实现是什么?
2. 锁续命是如何实现的?
3. 没加锁成功的线程是如何实现while自旋循环,间歇性尝试获取锁?以及获取锁的线程在超时间时间之前完成任务,是如何唤醒正在等待的线程?
首先,我们先来分析加锁的过程:
redisson.getLock(lockKey);
在这个方法中,其会调到RedissonLock类中的lockInterruptibly方法,源码如下图:
调用的tryAcquire方法中会调用tryLockInnerAsync进行异步加锁
其方法的源码为:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then" "redis.call('hset', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil;" "end;" "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" "redis.call('hincrby', KEYS[1], ARGV[2], 1);" "redis.call('pexpire', KEYS[1], ARGV[1]);" "return nil;" "end;" "return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
这里就是一段Redis的lua脚本,其开始的if主要是判断当前Redis中是否存在传入的key数据,如果没有。就设置key数据,同时设置其超时间为传入指定的时间,然后返回nil(相当于Java中的null值),第二段的if主要是设置可重入操作,否则说明当前Redis中已经存在指定的key值,返回当前key剩余的时间
因为是异步调用的,返回值为RFuture对象,调用完毕后,会回调外层的addListener方法,这个方法中会判断是否加锁成功,如果加锁成功,会调用scheduleExpirationRenewal方法刷新锁时间(锁续命),源码如下:
private void scheduleExpirationRenewal(final long threadId) { if (!expirationRenewalMap.containsKey(this.getEntryName())) { Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RFuture<Boolean> future = RedissonLock.this.commandExecutor. evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, //执行lua脚本,对当前线程持有的锁进行超时时间重新设置 RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)" "then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)}); future.addListener(new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName()); if (!future.isSuccess()) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause()); } else { if ((Boolean)future.getNow()) { //递归调用 RedissonLock.this.scheduleExpirationRenewal(threadId); } } } }); } //this.internalLockLeaseTime默认是30,也就是说最后10秒时,会执行run方法,类似延时执行 }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) { task.cancel(); } } }
那如果是没有加锁成功的线程,第一步我们已经提及会返回ttl,也就是当前持有锁的线程剩余时间,外层会进行判断,如下:
这里面,主要采用的是Redis的发布订阅模式,因为有些线程在执行任务时,可能在超时间时间到达之前就完成了任务,这时,就需要唤醒正在等待的线程,这里的实现主要是采用了Redis的发布订阅模式
否则的话,没有加锁成功的线程就会进入while循环,利用Semaphore方式,尝试获取锁,设置的阻塞等待时间为上一把锁剩余的时间ttl,等待时间过了,再次尝试去获取Redis的锁资源,所以这里才说是间歇性获取锁,如下图:
释放锁时,具体源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0)" "then redis.call('publish', KEYS[2], ARGV[1]);" "return 1;" "end;" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" "return nil;" "end;" "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" "if (counter > 0) then" "redis.call('pexpire', KEYS[1], ARGV[2]);" "return 0;" "else redis.call('del', KEYS[1]);" "redis.call('publish', KEYS[2], ARGV[1]);" "return 1;" "end;" "return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)}); }
这的主要动作有,删除key,除此外,还需要对外广播消息(publish),唤醒其等待的线程获取锁,还有就是接触重入锁的key
在Redis的主从架构中,如果在加锁操作时,主节点加锁成功,但是还未来得及将信息同步到从节点,会有一种现象,也就是当从节点通过选举变为主节点Master后,是不带之前加过锁的数据的,此时,就造成了所失效,具体如下图:
那解决方案主要有采用主从同步过半机制,保证其主节点同步数据到从节点的数量超过一半才认定当前的操作为成功
其Redis主要侧重于AP,也就是可用性和分区,而ZK架构设计主要是侧重CP,更多偏向于数据的一致性问题,采用的是主从同步过半机制来判断每次的操作是否成功
红锁其实也借鉴了ZK中的过半机制,也就是说,红锁会想每个Redis服务发送加锁请求,只要有一般以上的Redis能加锁成功,那就说明其加锁成功,但是有一个问题点在于,如果单台的Redis服务设置了主从结构,那就存在何上面Redis锁失效一样的问题
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。