赞
踩
分布式锁的应用场景与分布式锁实现(一):传统锁处理并发及传统锁的问题
所有代码已同步到GitCode:https://gitcode.net/ruozhuliufeng/distributed-project.git
借助Redis中的命令setnx(key,value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他客户端返回0(false)。
改造StockService方法:
/** * 减库存 */ @Override public void checkAndLock() { // 加锁setnx Boolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "1"); // 重试:递归调用 if (!lock){ try { Thread.sleep(50); this.deduct(); } catch (InterruptedException e) { e.printStackTrace(); } } else { try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock").toString(); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.扣减库存 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { // 解锁 this.redisTemplate.delete("lock"); } } }
其中,加锁也可以使用循环:
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "1")){
try {
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
解锁:
this.redisTemplate.delete("lock");
重置缓存值,使用Jmeter进行压力测试,并获取库存余量:0
代码均已上传至GitCode,可根据提交信息获取文件的更改内容
]
问题:setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法释放(死锁)
解决:给锁设置过期时间,自动释放锁
设置过期时间的两种方式:
问题:可能会释放其他服务器的锁
场景:如果业务逻辑的执行时间是7s。执行流程如下:
解决:setnx获取到锁时,设置一个指定的唯一值(例如:uuid),释放前获取这个值,判断是否是自己的锁。
实现如下:
问题:删除操作缺乏原子性
场景:
解决方案:没有一个命令可以同时做到判断+删除,所以只能通过其他方式实现(lua脚本)
lua脚本可以一次性发送多个指令给redis,由于Redis是单线程的,执行指令遵守one-by-one规则
Redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。例如:
在串行场景下:A和B的值肯定都是3
在并发场景下:A和B的值可能在0-6之间。
极限情况下1:
则A的结果是0,B的结果是3
极限情况下2:
则A和B的结果都是6。
如果Redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行。这和使用MULTI/EXEC包围的事务很类似。
但是MULTI/EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。
lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
设计目的
其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
lua特性
这里不做深究,感兴趣可以到官方教程或菜鸟教程,这里以Redis中可能会用到的部分语法作介绍。
变量:
a = 5 -- 全局变量
local b = 10 -- 局部变量,redis只支持局部变量
a,b = 10,2*x -- 等价于 a = 10; b = 2*x
流程控制:
if(布尔表达式 1)
then
-- [ 在布尔表达式 1 为true时执行改语句块 ]
elseif(布尔表达式 2)
then
-- [ 在布尔表达式 2 为true时执行改语句块 ]
else
-- [ 在以上表达式都不为true时执行改语句块 ]
end
在Redis中需要通过eval命令执行lua脚本。
格式:
EVAL script numkeys key [key ...] arg [arg ...]
script: lua脚本字符串,这段lua脚本不需要(也不应该)定义函数
numkeys: lua脚本中keys数组的大小
key [key ...]: KEYS数组中的元素
arg [arg ...]: ARGV数组中的元素
EVAL "return 10" 0
# 输出:10
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 输出:10 20 60 70
# 下标从1开始
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 输出:0
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 输出:1
传入了两个参数 10 和 20,KEYS的长度是1,所以KEYS中有一个元素10,剩下的一个20就是ARGV数组中的元素
redis.call()中的redis是redis中提供的lua脚本类库,仅在redis环境中使用该类库。
set a 10 -- 设置一个a 值为10
EVAL "return redis.call('get','a')" 0
# 通过return把call方法返回给redis客户端,打印:10
注意:**脚本里使用的所有键都应该由KEYS数组来传递。**但并不是强制性的,代价是这样写出的脚本不能被Redis集群所兼容。
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 b 20
以上案例基本可以应付Redis分布式锁所需要的脚本知识了。
-- 当call()在执行命令的过程中发生错误时,脚本会停止运行,并返回一个脚本错误,输出错误信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]),redis.call('set', KEYS[2], ARGV[2])" 2 c d 20 30
- pcall函数不影响后续指令的执行
EVAL "return redis.pcall('sets',KEYS[1],ARGV[1]),redis.pcall('set' ,KEYS[2],ARGV[2])" 2 c d 20 30
注意:set方法写成了sets,肯定会报错。
删除lua脚本:
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
更新代码:
package tech.msop.distributed.lock.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.ScopedProxyMode; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.SessionCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import tech.msop.distributed.lock.constants.StockConstant; import tech.msop.distributed.lock.entity.StockEntity; import tech.msop.distributed.lock.mapper.StockMapper; import tech.msop.distributed.lock.service.IStockService; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * 库存服务实现类 <br/> */ @Service @Slf4j public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity> implements IStockService { @Autowired private StringRedisTemplate redisTemplate; /** * 减库存 */ @Override public void checkAndLock() { String uuid = UUID.randomUUID().toString(); // 加锁 setnx while (!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) { // 重试循环 try { Thread.sleep(30); } catch (InterruptedException e) { throw new RuntimeException(e); } } try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock"); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.更新到数据库 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { String script = "if redis.call('get',KEYS[1]) == ARGV[1] " + "then " + " return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"),uuid); } } }
进行压力测试并查询库存余量:0
以上加锁命令存在一个问题,由于加锁命令使用了SETNX,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
用一段Java代码解释可重入:
public synchronized void a(){
b();
}
public synchronized void b(){
// pass
}
假设X线程在a方法获取锁之后,继续执行b方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。
锁明明是X线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己。
而可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加1,然后在执行方法逻辑。退出加锁方法之后,加锁此处再减1,当加锁次数为0时,锁才被真正的释放。
可以看到可重入锁的最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
通过阅读JDK中的可重入锁源码,可知可重入锁ReentrantLock的加锁流程:ReentrantLock.lock) —> NonfairSync.lock() —> AQS.acquire(1) —> NonfairSync.tryAcquire(1) —> Sync.nonfairTryAcquire(1)
可重入锁的解锁流程:Reentrant.unlock() —> AQS.release(1) —> SyncRelease(1)
确定解决方案:Redis+hash
参照ReentrantLock中的非公平可重入锁实现分布式可重入锁:hash + lua脚本
Redis提供了Hash(哈希表)这种可以存储键值对数据结构。所以我们可以使用Redis Hash存储锁的重入次数,然后利用lua脚本判断逻辑。通过JDK的可重入锁分析,继续分析Redis中的加锁流程:
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
then
redis.call('hincrby', KEYS[1], ARGV[1], 1);
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
-- key: lock
-- arg: uuid 30
假设值为:KEYS[lock],ARGV[uuid,expire]
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁此处加1
分析Redis的解锁流程:
-- 判断hash set 可重入key的值是否等于0
-- 如果为 nil 代表 自己的锁不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end
-- key: lock
-- arg: uuid
由于后续会有基于Zookeeper和基于MySQL实现的分布式锁,我们可以通过工厂类,获取不同类型的分布式锁。
DistributedLockClient工厂类具体实现:
@Component public class DistributedLockClient { @Autowired private StringRedisTemplate redisTemplate; private String uuid; public DistributedLockClient() { this.uuid = UUID.randomUUID().toString(); } public DistributedRedisLock getRedisLock(String lockName){ return new DistributedRedisLock(redisTemplate, lockName, uuid); } }
DistributedRedisLock实现如下:
public class DistributedRedisLock implements Lock { private StringRedisTemplate redisTemplate; private String lockName; private String uuid; private long expire = 30; public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuid = uuid; } @Override public void lock() { this.tryLock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { try { return this.tryLock(-1L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 加锁方法 * @param time * @param unit * @return * @throws InterruptedException */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time != -1){ this.expire = unit.toSeconds(time); } String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end"; while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){ Thread.sleep(50); } return true; } /** * 解锁方法 */ @Override public void unlock() { String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId()); if (flag == null){ throw new IllegalMonitorStateException("this lock doesn't belong to you!"); } } @Override public Condition newCondition() { return null; } /** * 给线程拼接唯一标识 * @return */ String getId(){ return uuid + ":" + Thread.currentThread().getId(); } }
在业务代码中使用:
package tech.msop.distributed.lock.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import tech.msop.distributed.lock.constants.StockConstant; import tech.msop.distributed.lock.entity.StockEntity; import tech.msop.distributed.lock.lock.DistributedLockClient; import tech.msop.distributed.lock.lock.DistributedRedisLock; import tech.msop.distributed.lock.mapper.StockMapper; import tech.msop.distributed.lock.service.IStockService; /** * 库存服务实现类 <br/> */ @Service @Slf4j public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity> implements IStockService { @Autowired private StringRedisTemplate redisTemplate; @Autowired private DistributedLockClient distributedLockClient; /** * 减库存 */ @Override public void checkAndLock() { DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock"); redisLock.lock(); try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock").toString(); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.扣减库存 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { redisLock.unlock(); } } }
使用Jmet测试并查询库存余量:0
测试可重入性:
借助Timer定时器+lua脚本实现自动续期:
if (redis.call('hexists',KEYS[1],ARGV[1]) == 1)
then
return redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end
-- key: lock
-- arg: uuid 30
修改Redis分布式锁,实现锁自动续期:
package tech.msop.distributed.lock.lock; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import java.util.Arrays; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 基于Redis实现分布式锁 */ public class DistributedRedisLock implements Lock { private final StringRedisTemplate redisTemplate; private final String lockName; private final String uuid; private long expire = 30; public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) { this.redisTemplate = redisTemplate; this.lockName = lockName; this.uuid = uuid + ":" + Thread.currentThread().getId(); } @Override public void lock() { this.tryLock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { try { return this.tryLock(-1L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 加锁方法 * * @param time 超时时间 * @param unit 时间单位 * @return 加锁是否成功 * @throws InterruptedException 异常 */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { if (time != -1) { this.expire = unit.toSeconds(time); } String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end"; while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) { Thread.sleep(50); } // 在加锁成功返回之前,开启定时器,自动续期 renewExpire(); return true; } /** * 解锁方法 */ @Override public void unlock() { String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid); if (flag == null) { throw new IllegalMonitorStateException("this lock doesn't belong to you!"); } } @Override public Condition newCondition() { return null; } /** * 给线程拼接唯一标识 * * @return 唯一标识 */ // String getId() { // return uuid + ":" + Thread.currentThread().getId(); // } private void renewExpire(){ String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " return redis.call('expire', KEYS[1], ARGV[2]) " + "else " + " return 0 " + "end"; new Timer().schedule(new TimerTask() { @Override public void run() { if (redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){ renewExpire(); } } },this.expire * 1000 /3); } }
在tryLock方法中使用:
构造方法作如下修改:
解锁方法作如下修改:
独占排他使用 setnx
防止死锁:设置锁的过期时间
原子性
防误删:解铃还须系铃人
可重入性:hash(key field value) + lua脚本
自动续期:Timer定时器 + lua脚本
在集群情况下,导致锁机制失效:
Redis集群状态下的问题:
安全失效!
解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock
在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。在前面已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。 将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主从服务器,确保它们以独立的方式发生故障。
为了获取锁,客户端执行以下操作:
每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机产生很小的时钟漂移。只有在拥有锁的客户端将在锁的有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移。
当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此在理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。
值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet、Set、Multimap、SortedSet、Map、List、Queue、BlockingQueue、Semaphore、Lock、AtomicLong、CountDownLatch、Publish/Subscribe、Bloom filter、Remote Service、Spring cache、Executor Service、Live Object Service、Scheduler Service)。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/redisson/redisson/wiki
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口。
众所周知,如果负责存储这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson示例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过吸怪Config.lockWatchdogTimeout
来另行指定。
RLock
对象完全符合Java的Lock规范。也就是说只有锁的进程才能解锁,其他进程则会抛出IllegalMonitorStateException
错误。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间锁便自动解开了。
RLock lock = redisson.getLock("anyLock"); // 最常见的使用方法 lock.lock(); // 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.21.3</version>
</dependency>
package tech.msop.distributed.lock.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Redisson 配置 */ @Configuration public class RedissonConfig { /** * Redisson 客户端配置 * * @return Redisson客户端 */ @Bean public RedissonClient redissonClient() { // 初始化配置对象 Config config = new Config(); // 单机Redis服务 config.useSingleServer() .setAddress("redis://127.0.0.1:6379") // redis服务地址,必须 redis://ip:port // .setDatabase(0) // 指定Redis数据库编号 // .setUsername("") // redis 用户名 // .setPassword("")// redis 密码 // .setConnectionMinimumIdleSize(10)// 连接池最小空闲连接数 // .setConnectionPoolSize(50) // 连接池最大线程数 // .setIdleConnectionTimeout(60000) // 线程超时时间 // .setConnectTimeout() // 客户端获取redis 链接的超时时间 // .setTimeout()// 响应超时时间 ; return Redisson.create(config); } }
package tech.msop.distributed.lock.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import tech.msop.distributed.lock.constants.StockConstant; import tech.msop.distributed.lock.entity.StockEntity; import tech.msop.distributed.lock.lock.DistributedLockClient; import tech.msop.distributed.lock.lock.DistributedRedisLock; import tech.msop.distributed.lock.mapper.StockMapper; import tech.msop.distributed.lock.service.IStockService; /** * 库存服务实现类 <br/> */ @Service @Slf4j public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity> implements IStockService { @Autowired private StringRedisTemplate redisTemplate; @Autowired private DistributedLockClient distributedLockClient; @Autowired private RedissonClient redissonClient; /** * 减库存 */ @Override public void checkAndLock() { RLock lock = redissonClient.getLock("lock"); lock.lock(); try { // 1. 查询库存信息 String stock = redisTemplate.opsForValue().get("stock").toString(); // 2. 判断库存是否充足 if (stock != null && stock.length() != 0) { Integer st = Integer.valueOf(stock); if (st > 0) { // 3.扣减库存 redisTemplate.opsForValue().set("stock", String.valueOf(--st)); } } } finally { lock.unlock(); } } }
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock
接口的一种RLock
对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会在等待5秒后继续下一个线程,也就是说如果前面5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redissonClient.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
基于Redis的Redisson分布式联锁RedissonMultiLock
对象可以将多个RLock
对象关联为一个联锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
基于Redis的Redisson红锁RedissonRedLock
对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock
对象关联为一个红锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
基于Redis的Redisson分布式可重入读写锁RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常见的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 10秒钟以后自动解锁 // 无需调用unlock方法手动解锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
添加StockController方法:
@GetMapping("test/read")
public String testRead(){
String msg = stockService.testRead();
return "测试读";
}
@GetMapping("test/write")
public String testWrite(){
String msg = stockService.testWrite();
return "测试写";
}
添加StockService方法:
public String testRead() { RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock"); rwLock.readLock().lock(10, TimeUnit.SECONDS); System.out.println("测试读锁。。。。"); // rwLock.readLock().unlock(); return null; } public String testWrite() { RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock"); rwLock.writeLock().lock(10, TimeUnit.SECONDS); System.out.println("测试写锁。。。。"); // rwLock.writeLock().unlock(); return null; }
打开开两个浏览器窗口测试:
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();
在StockController添加方法:
@GetMapping("test/semaphore")
public String testSemaphore(){
this.stockService.testSemaphore();
return "测试信号量";
}
在StockService添加方法:
public void testSemaphore() {
RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(3);
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
添加测试用例:并发10次,循环一次
控制台效果:
控制台1:
1606960790234
1606960800337
1606960800443
1606960805248
控制台2:
1606960790328
1606960795332
1606960800245
控制台3:
1606960790433
1606960795238
1606960795437
由此可知:
1606960790秒有3次请求进来:每个控制台各1次
1606960795秒有3次请求进来:控制台2有1次,控制台3有2次
1606960800秒有3次请求进来:控制台1有2次,控制台2有1次
1606960805秒有1次请求进来:控制台1有1次
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
需要两个方法:一个等待,一个计数countDown
给StockController添加测试方法:
@GetMapping("test/latch")
public String testLatch(){
stockService.testLatch();
return "班长锁门。。。";
}
@GetMapping("test/countdown")
public String testCountDown(){
stockService.testCountDown();
return "出来了一位同学";
}
给StockService添加测试方法:
public void testLatch() {
RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
cdl.trySetCount(6);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void testCountDown() {
RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");
cdl.countDown();
}
重启测试,打开两个页面:当第二个请求执行6次之后,第一个请求才会执行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。