赞
踩
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增ID就存在一些问题:
id自增的问题:
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般满足下列特性:
利用Redis的incr命令,为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息---数值类型,java里的Long类型,8个字节共64位
ID的组成部分:(时间相同的情况下,序列号不一样)
utils包下定义一个类 RedisIdWorker (基于Redis的Id生成器)
注意:该自增是利用Redis的自增方法,我们位移后获取的是二进制因为我们左移了,32+32,二进制的低32位就是我们的count,高32位就是时间戳,如果高并发下时间戳一样,看起来就是自增的,如:
如:
- @Component
- @Slf4j
- public class RedisIdWorker {
- /**
- * 开始时间戳
- */
- private static final long BEGIN_TIMESTAMP = 1640995200L;
-
- /**
- * 序列号位数
- */
- private static final int COUNT_BITS = 32;
-
- private StringRedisTemplate stringRedisTemplate;
-
- public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
- this.stringRedisTemplate = stringRedisTemplate;
- }
-
- /**
- * 生成全局ID
- */
- public long nextId(String keyPrefix) {
- /**
- * 生成时间戳
- * 31位的数字,单位秒,他的值是要有一个 基础的时间 作为开始时间
- * 时间戳(秒数) = 当前时间 - 基础时间,
- * 即:从开始时间 隔了多少秒
- */
- LocalDateTime now = LocalDateTime.now();
- long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
- // 当前时间 - 基础时间 = 时间戳
- long timestamp = nowSecond - BEGIN_TIMESTAMP;
-
- /**
- * 生成序列号
- * 利用Redis的自增长 用字符串结构 默认一次自增 1,
- * 不同的业务有不同的key,
- * stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":");
- * 这样不可以:
- * 1、这样的话默认就是整个订单业务都是一个key,不管过了多少年,随着业务的订单越来越多
- * ,而redis 单个key的自增长是有上限的,是2的64次方,虽然大也是有上限的
- * 2、而且key里边真正用来记录序列号的,只有32个bit位,如果说将来key,超过了32个bit,那就存不下了。
- * 所以,哪怕是同一个业务,也不能使用同一个key
- * 解决:
- * 我们可以在后边拼一个当期日期 比如20220910,到了第二号,就是一个新的key了20220911
- * 这样就还有一个统计的效果
- */
- // 获取当前日期,自定义格式化, 这样就还可以统计 年 月 日的单
- String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
-
- // 这里用基本类型,后面要做运算,如果key不存在,会自动创建的,不会有空指针
- // 注意,插到库里这个key 是一个,value就是count 回覆盖之前
- long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
- log.info("count={}", count);
- /**
- * 拼接并返回,利用位运算,
- * 前面说了就当做long类型8个字节,
- * 全局id = 0 + 时间戳 + 序列号
- * 将时间的戳的值左移32位,然后空出来的32位,
- * 利用 | 运算去将序列号填充即可
- *
- * 或运算| 一个为真即为真,现在后面的32位都是0,
- * 而count的值 可能为0 可能为1,我们希望不管为0还是1都需要填充到后32位
- * 0|0 = 0
- * 0|1 = 1
- * 即:count值将来是什么,后32位就保留什么了
- */
- return timestamp << COUNT_BITS | count;
- }
-
- /**
- * 计算基础(当前)时间秒数
- */
- public static void main(String[] args) {
- // of 方法可以指定年月日
- LocalDateTime now = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
- // 接收时期
- long second = now.toEpochSecond(ZoneOffset.UTC);
- System.out.println(second);
- }
- }
注意:每天一个key,方便我们去统计每天的订单量
全局唯一ID生成策略:
Redis自增ID策略:(Redis保存key需要注意)
测试类:
- private ExecutorService es = Executors.newFixedThreadPool(500);
-
- @Test
- void testIdWorker() throws InterruptedException {
- CountDownLatch latch = new CountDownLatch(300);
-
- Runnable task = () -> {
- for (int i = 0; i < 100; i++) {
- long id = redisIdWorker.nextId("order");
- System.out.println("id = " + id);
- }
- latch.countDown();
- };
- long begin = System.currentTimeMillis();
- for (int i = 0; i < 300; i++) {
- es.submit(task);
- }
- latch.await();
- long end = System.currentTimeMillis();
- System.out.println("time = " + (end - begin));
- }
实现优惠券秒杀下单
在VoucherController中提供了一个接口,可以添加秒杀优惠券;
http://localhost:8081/voucher/seckill
添加秒杀券也是优惠券,只不过在t_voucher实体类里已经把秒杀券的信息拿到了都,
- {
- "shopId":1,
- "title":"100元代金券",
- "subTitle":"周一至周五均可使用",
- "rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
- "payValue":8000,
- "actualValue":10000,
- "type":1,
- "stock":100
- }
2、实现下单接口,完成抢购功能
实现优惠券秒杀的下单功能:
下单时需要判断两点
基本功能实现:
- @Resource
- private ISeckillVoucherService seckillVoucherService; // 秒杀券的业务层
-
- @Resource
- private IVoucherOrderService voucherOrderService; // 订单业务层
-
- @Resource
- private RedisIdWorker redisIdWorker; // 全局区域ID工具类
-
- /**
- * 当有两张表以上的操作时,要加上事务,出现问题会及时回滚,
- * 否则无法回滚的
- * @param voucherId
- * @return
- */
- @Override
- @Transactional
- public Result seckillVoucher(Long voucherId) {
- // 1、查询优惠券
- SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
-
- // 2、判断秒杀是否开始
- if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
- return Result.fail("秒杀活动尚未开始");
- }
-
- // 3、判断秒杀是否结束 结束时间在当前时间值前
- if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
- return Result.fail("秒杀活动已经结束了");
- }
-
- // 4、判断库存是否充足
- if (voucher.getStock()<1) {
- return Result.fail("库存不足");
- }
-
- // 5、扣减库存
- boolean isUpdate = seckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id", voucherId).update();
- if (!isUpdate) {
- return Result.fail("库存不足");
- }
- // 6、创建订单
- VoucherOrder voucherOrder = new VoucherOrder();
-
- // 6.1 订单id
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
-
- // 6.2 用户id
- Long userId = UserHolder.getUser().getId();
- voucherOrder.setUserId(userId);
-
- // 6.3 优惠券id
- voucherOrder.setVoucherId(voucherId);
-
- // 6.4 保存订单信息
- voucherOrderService.save(voucherOrder);
-
- // 7、返回订单 id
- return Result.ok(orderId);
- }
以上代码有超卖问题
利用JMeter模拟高并发
/voucher-order/seckill/10
注意:这里我们需要在JMeter里加一个请求头,即:用户token的请求头,否则拦截器过不去,redis里找的:authorization:6a44cf0d7b564030906ad8ed9577285c
模拟结果,order表出现109条数据,库存是-9,
这样就出现了问题,我们要卖100个券,实际却卖了109张券
出现抢占资源的(并发安全)问题
加锁:悲观锁&乐观锁,这两个所只是一种理念
悲观锁:还有像数据库的互斥锁也是悲观锁
乐观锁:比如我查DB的优惠券库存,要更新了,更新之前我去判断一下,有没有别人修改库存,
修改代码:然后继续执行JMeter,我们发现并不好用,库存只卖出了21件,订单也是21个
- /**
- * 扣减库存
- * 修改代码,添加乐观锁,CAS机制
- * 添加where条件让stock等于我们上边查到的stock值
- * 如果查到了,说明没人修改,如果没查到说明有人修改了
- * 光加这一个的话,库存并没有到0,数据库还有79个
- */
- boolean isUpdate = seckillVoucherService.update()
- .setSql("stock=stock-1")
- .eq("voucher_id", voucherId)
- .eq("stock", seckillVoucher.getStock())
- .update();
券还没卖完,就返回没有库存了,这就是设计乐观锁的一个弊端,当一个线程修改成功了,其余线程也在执行,发现没有查到,就修改失败了,这就浪费了一次线程任务,虽然修改失败了,但是并没有线程安全问题,我们需要做处理,不用不等于就报错,可以直接让stock只要大于0就好
- / * 我们修改一下,只要stock>0即可,不一定要必须等于,ge大于的意思
-
- */
- boolean isUpdate = seckillVoucherService.update()
- .setSql("stock=stock-1")
- .eq("voucher_id", voucherId)
- .gt("stock", 0)
- .update();
解决办法就在tb_voucher_order里userId和VoucherId 联合查询,就能确定是同一个用户
解决思路:
代码片段:
- Long userId = UserHolder.getUser().getId();
- /**
- * 新添加一人一单
- */
- int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if (count>0) {
- return Result.fail("用户已经买过了"+count+"次!");
- }
-
- /**
- * 以下的就继续走
- */
- boolean isFlag = seckillVoucherService.update().
- setSql("stock = stock-1").
- eq("voucher_id", seckillVoucher.getVoucherId())
- .gt("stock", 0)
- .update();
- if (!isFlag){
- return Result.fail("库存不足");
- }
跑批后发现数据库数据没有对上,200个线程,同一用户,却跑出了10个订单,以前是一人下100单,现在是一人下了10单,问题依旧存在。即:多线程并发的情况下,上述代码,大家都去查询,查询的count都是0,然后都往下走,去创建订单了,这就又出现多线程并发问题
注意: 这里不能用乐观锁了,乐观锁是在更新数据的时候用的,而我们这块的代码是新增,只能用悲观锁了
而锁的对象是不应该是this整个对象,而应该是一人一锁,即:是用户ID
- public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
- @Resource
- private ISeckillVoucherService seckillVoucherService;
- @Resource
- private RedisIdWorker redisIdWorker;
-
- @Override
- public Result seckillVoucher(Long voucherId) {
- /*
- 1、判断是否开始
- 2、判断是否结束
- 3、判断是否有库存
- 4、下单
- 5、减库存
- 6、返回
- */
- SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
- if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
- return Result.fail("活动尚未开始");
- }
- if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
- return Result.fail("活动已结束");
- }
- Integer stock = seckillVoucher.getStock();
- if (stock<1) {
- return Result.fail("库存不足");
- }
-
- /**
- * 每个人,即:每个用户自己有独立的一把锁,
- * 但是,要知道每次请求来的时候,我们创建的这个userId都是一个全新的对象,因此对象变了,锁也就没有意思了。
- * 要求值是一样的,所以我们toString,转成字符串,锁的是值
- * 而toString的底层代码里,也是return new String(buf, true); new了一个字符串
- * 所以每调一次toString,也是一个全新字符串对象,锁对象也还是会变的,
- * 每次来,虽然都是1011,即使转成字符串也是一个全新的对象,还是不可以,所以我们调用
- * 调用toString.intern() 方法,
- * 简单理解为去常量池里查,如果字符串常量池里有一个equals比较为true的,那就返回池子
- * 中的字符串,不会new新的字符串了,这样锁的就是同一个用户了,而不同的用户就不会被锁定,这样性能就提高了
- * 但是,有个问题,
- * 我们开启事务开始执行,执行之后,先释放锁,才会提交事务,
- * 而事务是有spring管理的
- * 就是这个函数方法执行完后,由spring去做提交,而这个锁在 synchronized{} 大括号结束后已经释放了,
- * 锁释放了,就意味着其他线程可以进来了,而此时事务尚未提交,那有其他线程进来查询操作,我们新增的这个订单可能还没有写入数据库
- * 这个时候别的线程去查询可能依然不存在,存在并发安全问题,因此在里边锁的情况,锁的范围就有点小了,
- * 应该是事务提交之后我们再去释放锁,我们应该把整个方法锁起来
- *
- * 事务失效:非public 目标对象
- * 我们的事务是在另一个方法开启的,而不是在调用它的方法开启的,这就会导致spring管理的事务失效
- * 在方法里调用别的方法,会导致事务失效,即:目标对象,而不是代理对象了
- *
- * 这里需要去拿到事务的代理对象才可以
- * 借助一个API
- *
- */
-
- // return createVoucherOrder(voucherId);
- Long userId = UserHolder.getUser().getId();
- synchronized (userId.toString().intern()) {
- /**
- * 借助一个API 通过这个方法去拿到当前对象的代理对象 我们称为 普绕SEI,
- * 而 当前代理对象就是他的service - > IVoucherOrderService
- */
-
- // 这样我们就拿到了当前代理对象了
- // 获取代理对象,和事务有关的代理对象,然后再去调用方法函数就没有问题了
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- /*
- 用代理对象去调用该函数,而不是用this调用,这样的话这个函数就会被spring管理了
- 因为proxy 这个对象是由spring创建的,所以proxy.createVoucherOrder2(voucherId, userId);
- 他是带有事务的这样一个函数
-
- 函数不存在的原因是因为IVoucherOrderService接口不存在这个函数,我们创建就好了
- 我们是在实现类里做的,我们创建一下就好了,
- IVoucherOrderService接口有了,我们才能基于接口去做调用
-
- 现在我们的事务才能生效
-
- 这么做的话还需要做两件事
- 1、新添加依赖,aspectj包下的aspectjweaver 这样一依赖,(动态代理的模式)
- 2、启动类添加一个注解:去暴露这个代理对象:
- @EnableAspectJAutoProxy(exposeProxy = true) 默认值是false
- 将默认值改为true,false是不会暴露的,不暴露的去获取是获取不到的,
-
- 一但暴露设置好了,我们这样就可以拿到代理对象了
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
-
- */
- return proxy.createVoucherOrder2(voucherId, userId);
- }
- }
-
- @Transactional
- public Result createVoucherOrder1(Long voucherId, Long userId) {
- /**
- * 每个人,即:每个用户自己有独立的一把锁,
- * 但是,要知道每次请求来的时候,我们创建的这个userId都是一个全新的对象,因此对象变了,锁也就没有意思了。
- * 要求值是一样的,所以我们toString,转成字符串,锁的是值
- * 而toString的底层代码里,也是return new String(buf, true); new了一个字符串
- * 所以每调一次toString,也是一个全新字符串对象,锁对象也还是会变的,
- * 每次来,虽然都是1011,即使转成字符串也是一个全新的对象,还是不可以,所以我们调用
- * 调用toString.intern() 方法,
- * 简单理解为去常量池里查,如果字符串常量池里有一个equals比较为true的,那就返回池子
- * 中的字符串,不会new新的字符串了,这样锁的就是同一个用户了,而不同的用户就不会被锁定,这样性能就提高了
- * 但是,有个问题,
- * 我们开启事务开始执行,执行之后,先释放锁,才会提交事务,
- * 而事务是有spring管理的
- * 就是这个函数方法执行完后,由spring去做提交,而这个锁在 synchronized{} 大括号结束后已经释放了,
- * 锁释放了,就意味着其他线程可以进来了,而此时事务尚未提交,那有其他线程进来查询操作,我们新增的这个订单可能还没有写入数据库
- * 这个时候别的线程去查询可能依然不存在,存在并发安全问题,因此在里边锁的情况,锁的范围就有点小了,
- * 应该是事务提交之后我们再去释放锁,我们应该把整个方法锁起来
- * 即:我们只有 先获取锁-> 提交事务 -> 再释放锁 这样一个操作,才能保证线程安全问题
- */
-
-
- /**
- * 新添加一人一单
- */
- int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if (count>0) {
- return Result.fail("用户已经买过了"+count+"次!");
- }
-
- /**
- * 以下的就继续走
- */
- boolean isFlag = seckillVoucherService.update().
- setSql("stock = stock-1").
- eq("voucher_id", voucherId)
- .gt("stock", 0)
- .update();
- if (!isFlag){
- return Result.fail("库存不足");
- }
-
- VoucherOrder voucherOrder = new VoucherOrder();
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- voucherOrder.setUserId(userId);
- voucherOrder.setVoucherId(voucherId);
- save(voucherOrder);
- return Result.ok(orderId);
- }
-
- @Transactional
- public Result createVoucherOrder2(Long voucherId, Long userId) {
- synchronized (userId.toString().intern()) {
- /**
- * 新添加一人一单
- */
- int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- if (count>0) {
- return Result.fail("用户已经买过了"+count+"次!");
- }
-
- /**
- * 以下的就继续走
- */
- boolean isFlag = seckillVoucherService.update().
- setSql("stock = stock-1").
- eq("voucher_id", voucherId)
- .gt("stock", 0)
- .update();
- if (!isFlag){
- return Result.fail("库存不足");
- }
-
- VoucherOrder voucherOrder = new VoucherOrder();
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- voucherOrder.setUserId(userId);
- voucherOrder.setVoucherId(voucherId);
- save(voucherOrder);
- return Result.ok(orderId);
- }
- }
1、方法内调方法,非代理方法
2、异常,只有运行时异常 和 error可以
3、Spring 事务失效的 8 大场景,看看你都遇到过几个?_oh LAN的博客-CSDN博客_spring事务失效的场景
一人一单的并发安全问题
service下,ctrl+D 添加一个启动类,模拟集群模式下并发访问,
集群模式下,不同的JVM锁监视器是不同对,锁对象自然也就不同了,即使是同一个值也不行
postMan两个一样的请求 http://localhost:8080/api/voucher/list/10
结果都进到锁里了,这显然是不对的
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
如何让多个JVM在集群或分布式下实现同一把锁?那就是分布式锁!
解决办法,由JVM内部锁监视器,变为同有的外部锁监视器,即:让多进程可见同一个监视器对象
什么是分布式锁:?
redis、mysql、zookeeper
如何实现互斥:
高可用:
高性能:
安全性:
可用性上和性能来讲 redis更好。
安全性上zookeeper更好。
实现分布式锁需要实现的两个方法:
1、获取锁 (jdk获取锁又两种机制,一种是阻塞等待,等待有人释放锁为止,另一种是阻塞结束返回 一个结果,不等待。阻塞对CPU浪费,实现麻烦。这里我们利用非阻塞的)
2、释放锁
基于Redis实现分布式锁初级版本
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能
尝试获取锁:因为我们采用非阻塞的方法,不会用去不断重试,也不会阻塞
实现类:利用构造方法去获取参数
- public class SimpleRedisLock implements ILock {
-
- private StringRedisTemplate stringRedisTemplate;
- private String name; // lockKey
- private static final String PRE_KEY = "lock:"; // lockKey前缀
-
- public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
- this.stringRedisTemplate = stringRedisTemplate;
- this.name = name;
- }
-
- @Override
- public boolean tryLock(long timeoutSec) {
- // 这里我们用线程的id 作为value
- long threadId = Thread.currentThread().getId();
-
- // 利用setnx ex:timeoutSec 超时释放时间 来实现互斥
- Boolean isFlag = stringRedisTemplate.opsForValue().setIfAbsent(PRE_KEY + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
-
- //这里isFlag接收的是包装类,而返回类型是非包装,防止null,报的空指针,这里用个True,
- // true 为 true false和null 都返回false
- return Boolean.TRUE.equals(isFlag);
- }
-
- @Override
- public void unLock() {
-
- // 删除key 释放锁
- stringRedisTemplate.delete(PRE_KEY + name);
- }
- }
修改代码锁的逻辑,VoucherOrderServiceImpl类
由synchronized单体锁,变为Redis的setnx锁实现分布式锁
- Long userId = UserHolder.getUser().getId();
- // 单体锁
- /*synchronized (userId.toString().intern()) {
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createOrder(voucherId, userId);
- }*/
-
- // redis分布式锁,实现分布式或集群下的多线程一人一单 用userId
- SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:"+userId);
- boolean lock = simpleRedisLock.tryLock(5);
-
- // false 要么失败,要么重试,一人一单这里抢购显然是用失败
- if (!lock) {
- return Result.fail("不能重复抢购");
- }
-
- // 释放锁在 finally
- try {
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createOrder(voucherId, userId);
- } finally {
- // 释放锁
- simpleRedisLock.unLock();
- }
大多数情况下setnx锁都能实现互斥,但是在极端情况下,依然会存在问题!
比如线程1拿到锁后,业务进入阻塞状态中
总结来说:
1、添加UUID,每个tomact的UUID是不一样的,一个服务是一样的,静态static
- // 用hutool的 true 是把UUID 的 - 去掉,然后再拼接一个中划线-来区分线程ID
- private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
2、添加标识
- @Override
- public boolean tryLock(long timeoutSec) {
- // 拼接
- System.out.println(ID_PREFIX);
- String threadId = ID_PREFIX + Thread.currentThread().getId();
-
- // 改造 直接把拼接的作为线程标识存进去就好了
- Boolean isFlag = stringRedisTemplate.opsForValue().setIfAbsent(PRE_KEY + name, threadId, timeoutSec, TimeUnit.SECONDS);
-
- //这里isFlag接收的是包装类,而返回类型是非包装,防止null,报的空指针,这里用个True,
- // true 为 true false和null 都返回false
- return Boolean.TRUE.equals(isFlag);
- }
3、释放锁,虽然是静态的,但是因为是集群模式下,每个服务的UUID是不同的
- @Override
- public void unLock() {
- // 改造
- // 获取标识
- String threadId = ID_PREFIX + Thread.currentThread().getId();
-
- // 获取线程的value
- String id = stringRedisTemplate.opsForValue().get(PRE_KEY + name);
-
- /**
- * 如果标识 与 id 一致,则释放做,反之不管
- * 虽然uuid是静态的全局就一份,但是我们是在集群模式下,每个服务器的静态都不一样
- * 比如tomact1 的UUID可能永远都是123
- * 而tomact2的UUID可能永远都是456,这样就很明显的区分了标识
- */
- if (threadId.equals(id)) {
- stringRedisTemplate.delete(PRE_KEY + name);
- }
如果线程1超时被清除了,当线程二进来了并且加锁后,此时线程1执行完任务,线程一要释放锁,然后线程一发现,这是啥?这不是我的锁~,线程1就什么都不做了就
线程2未超时,执行完发下自己的标识与value相等,是自己的锁,则直接释放
以上就是解决分布式锁误删问题,让我们的分布式锁更加的健壮了!
在某些极端的情况下依然会有问题
比如线程1获取锁,执行完业务,获取锁的标识,发现一致,然后要去释放锁,然而,就在此时,要去释放锁的中间,发生了阻塞(比如GC GC执行的时候,其实是会阻塞我们的所有代码的),一但发生阻塞了,那我就没有去释放,如果阻塞的时间足够长,就很有可能触发我们的超时时间,
搞笑的来了,此时线程2进来了,开始执行任务,突然阻塞结束了,线程一开始去释放,此时判断已经执行过了,它认为锁还是自己的,直接执行释放锁了,就把线程二的锁给释放掉了,又执行了误删
此时,又来了个线程3,获取锁成功,执行自己的任务,又出现了并发安全问题
原因:判断锁标识,和释放锁 是两个动作,这两个动作之间产生了阻塞,最后出现了问题
解决:判断锁标识 和 释放锁 的动作,必须保证原子性,即:一起执行,不能出现间隔
首先Redis也是有事务的,但是,Redis能够保证事务的原子性,但是不能保障一致性
Redis的Lua脚本:
Redis提供了Lua脚本功能,在一个脚本中表写多条Redis命令,(Redis就会一次性去执行他们)确保这多条命令执行时的原子性。Lua是一种编程语言
2、Lua脚本多参数实现:Lua语言的下表是从1开始的
删除成功return 1,失败renturn 0,这样一个Lua脚本就写完了,local代表声明一个参数,if的大括号没有了,用then表示,结束用end表示
简化后的脚本,返回值为1,或0
- if (redis.call('get', keys[1]) == argv[1]) then
- return redis.call('del', keys[1])
- end
- return 0
需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedsiTimeplate调用Lua脚本的API如下
项目里定义一个Lua->unLock.lua 脚本,不要写死
- -- 比较线程标识与锁中的标识是否一致
- if(redis.call('get', KEYS[1]) == ARGV[1]) then
- -- 释放锁
- return redis.call('del', KEYS[1])
- end
- return 0
- /*
- 初始化Lua脚本,因为用的静态,只要这个类一加载,初始化就完成了,这样就不用每次释放锁去加载了,性能就提高了
- RedisScript是个接口,里边有个实现是 DefaultRedisScript,泛型是个返回值类型,我们这里用long 本来就是 1 和 0
- */
- private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
- static {
- /*
- 括号会接收一个脚本,ctrl+q 可以查看构造方法接收类型
- 我们可以看出来接收的是一个字符串,就是把文件里的内容当做字符串,这种模式就硬编码,不建议这么做
- 我们就放文件里,所以不去管他,传空
- 我们调用方法去设置脚本位置
- */
- UNLOCK_SCRIPT = new DefaultRedisScript<>();
-
- /*
- 接收参数 Resource,我们用spring提供的一个ClassPathResource,
- 就是在classpath下的一些资源
- 我们创建的unLock.lua 在resources文件下,就在classpath下,直接指定
- */
- UNLOCK_SCRIPT.setLocation(new ClassPathResource("unLock.lua"));
-
- /*
- * 还可以配置一下返回值类型
- */
- UNLOCK_SCRIPT.setResultType(Long.class);
- }
修改SimpleRedisKey 的释放锁方法,脚本有Redis帮我们执行,保证原子性(判断和释放的原子性)
- @Override
- public void unLock() {
- /*
- 調用Lua脚本,写到一个文件中 unLock.lua
- 参数:RedisScript 得去加载我们的Lua文件,不应该在释放锁的时候去读取
- 这里设置成静态的,每次执行都去读文件,产生IO流,效果不好,性能差
-
- 注意,key是传的集合,这里用集合的工具类,单元素的集合
- 这里不用管返回值了,释放锁嘛,成功就成功了,如果没有成功,就被别人释放了,或超时释放
-
- 这段脚本是由Redis帮我们执行的,保证原子性
- */
- stringRedisTemplate.execute(UNLOCK_SCRIPT
- ,Collections.singletonList(PRE_KEY + name)
- ,ID_PRE + Thread.currentThread().getId());
- }
总结:到此为止,已经实现了一个生产可用的基于Redis的分布式锁
大多数场景下,我们之前实现的锁都够用了,以上4个问题,大多数情况下,也基本不会去管,如果说对锁的要求非常高,那就必须解决这4个问题
我们可以利用一个框架去解决这4个功能:Redisson
嗯。。。我们以后使用分布式锁的时候,可以就使用这个开源的框架,Redisson什么都有
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.13.6</version>
- </dependency>
注意:tryLock();第一个参数是获取锁的最大等待时间比如1秒,期间会重试,如果1秒后还没有或得锁,那就回返回false
改造下单业务
Redisson的tryLock();有三种构造,是线程安全的
- @Configuration
- public class RedissonConfig {
-
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://localhost:6379");
- return Redisson.create(config);
- }
- }
- -- 注入上边工具类里写的配置Redissclient
- @Resource
- private RedissonClient redissonClient;
修改VoucherOrderServiceImpl下单的实现类,经测试,它是线程安全的
- /**
- * 这两个先注释,用Redisson的
- */
- //SimpleRedisKey lock = new SimpleRedisKey(stringRedisTemplate, "order:"+userId);
- //boolean islock = simpleRedisKey.tryLock(5L);
-
- //利用我们配置的Redisson的锁方法
- RLock lock = redissonClient.getLock("order:" + userId);
-
- // 这里用无参数的 tryLock() 方法
- boolean isLock = lock.tryLock();
- if (!isLock) {
- return Result.fail("不允许重复抢购");
- }
- try {
- IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
- return proxy.createOrder(voucherId, userId);
- } finally {
- lock.unlock();
- }
简单理解:当我获取锁的时候去判断是否有人拿了,然后判断是不是我自己拿的,也就是说是不是同一个线程(是同一个线程,就是我自己拿的,不是一个线程就是别人的),如果是同一个线程,就也让他获取锁,里边会有个计数器(会累加),回去记录获取了几次,以后释放锁的时候会去减1。当所有的业务都执行完,重入次数一定会被减成0,所以每次释放锁的时候,除了要-1,还需要去判断次数是否已经为0了,如果为0了,就可以直接删除锁了,否则不可以直接删锁
我们设计的,就是既要记录线程标识,也要记录线程的次数,显然String类型就不合适了,可以利用hash类型
Redisson多方面重置有效期,给后续任务留够充足的时间;
代码太多了,太复杂了,我们一定要保证代码的原子性,用Lua脚本去执行
获取锁的流程:
释放锁的流程:
利用Redisson测试可重入锁Demo,可以去打断点,根据断点去看Redis客户端
tryLock底层源码:
- @Slf4j
- @SpringBootTest
- class RedissonTest {
-
- @Resource
- private RedissonClient redissonClient;
-
- private RLock lock;
-
- @BeforeEach
- void setUp() {
- lock = redissonClient.getLock("order");
- }
-
- @Test
- void method1() throws InterruptedException {
- // 尝试获取锁
- boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
- if (!isLock) {
- log.error("获取锁失败 .... 1");
- return;
- }
- try {
- log.info("获取锁成功 .... 1");
- method2();
- log.info("开始执行业务 ... 1");
- } finally {
- log.warn("准备释放锁 .... 1");
- lock.unlock();
- }
- }
- void method2() {
- // 尝试获取锁
- boolean isLock = lock.tryLock();
- if (!isLock) {
- log.error("获取锁失败 .... 2");
- return;
- }
- try {
- log.info("获取锁成功 .... 2");
- log.info("开始执行业务 ... 2");
- } finally {
- log.warn("准备释放锁 .... 2");
- lock.unlock();
- }
- }
- }
他里边有一段逻辑,就是Lua脚本,redis是通过一个字符串的形式,将脚本直接写死了,我们是直接写一个文件里了
unlock();方法同理,底层也是一段Lua脚本
底层依然用的上面的Lua脚本 成功返回nil 就类似于 java 的null,失败返回的是锁的剩余有效期,pttl 是毫秒,注意返回的是一个future函数,该函数是一个异步函数(Async),返回没返回,下边的代码还不知道呢,
返回毫秒,下边这个get是一个阻塞等待,等待返回有效期,成功返回null,失败返回剩余时间
如果失败了,会走重试,那么也不是立刻就去重试,如下
- return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
- return nil;
- end;
- local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
- if (counter > 0) then
- redis.call('pexpire', KEYS[1], ARGV[2]);
- return 0;
- else
- redis.call('del', KEYS[1]);
- redis.call('publish', KEYS[2], ARGV[1]);
- return 1;
- end;
- return nil;",
- Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
!=null,上边 释放锁的脚本里,有一行叫publish命令,发布一个消息通知,而subscribe叫做订阅消息,那边发布了,这边就知道了,然后才会再次尝试重新执行(他还有一个等待时间,如果说等待时间超过了锁的剩余最大等待时间结束,那就不用等待了,返回false,那么非false返回true,)
- -- 源码
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
-
- -- 最大等待时间
- long time = unit.toMillis(waitTime);
- -- 当前时间
- long current = System.currentTimeMillis();
- long threadId = Thread.currentThread().getId();
- -- 获取锁的剩余时间
- Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
- -- 等于null 说明成功了
- if (ttl == null) {
- return true;
- } else {
- -- 否则 计算出最大剩余等待时间
- time -= System.currentTimeMillis() - current;
- -- 小于等于0 失败了
- if (time <= 0L) {
- this.acquireFailed(waitTime, unit, threadId);
- return false;
- } else {
- -- 再次计算时间
- current = System.currentTimeMillis();
- -- 订阅消息
- RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
- -- 等待时间time,就是锁的剩余最大等待时间,当RFuture在,指定时间内完成返回true
- -- 如果等待这个时间结束,还没有收到释放锁的通知就没必要等了,就会返回false,非就是true,就会进if里边
- if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.onComplete((res, e) -> {
- if (e == null) {
- this.unsubscribe(subscribeFuture, threadId);
- }
-
- });
- }
- // 然后告诉我们获取锁 失败了
- this.acquireFailed(waitTime, unit, threadId);
- return false;
- }
else 相反,在最大等待时间内,收到了通知
- try {
- -- 再次获取最大等待时间,注意,是减去上边代码 current 等待的消耗时间
- -- time = 剩余时间-等待的时间,得到的是这次的最大等待时间
- time -= System.currentTimeMillis() - current;
- -- 如果也小于等于0,那就也return false
- if (time <= 0L) {
- this.acquireFailed(waitTime, unit, threadId);
- boolean var20 = false;
- return var20;
- } else {
-
- -- 反之时间依然有剩余, 那么我们就终于可以去重试了
- boolean var16;
- do {
- -- 得到当前时间
- long currentTime = System.currentTimeMillis();
- -- 终于开始第一次重试获取锁了
- ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
- -- 等于null 获取成功
- if (ttl == null) {
- var16 = true;
- return var16;
- }
- -- 否则 继续判断是否超时
- time -= System.currentTimeMillis() - currentTime;
- if (time <= 0L) {
- this.acquireFailed(waitTime, unit, threadId);
- var16 = false;
- return var16;
- }
- -- 如果剩余时间还有,那就继续尝试
- currentTime = System.currentTimeMillis();
- -- 不同的是这里获取的是getLatch 信号量,释放锁的会释放一个信号量,依然有最大等待时间,两种情况,
- -- 第一,当ttl<剩余等待时间时,就会等ttl时间,时间到了会立即释放
- if (ttl >= 0L && ttl < time) {
- ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- -- 否则 就等time时间,time一旦到期,你还没释放,我也就没必要等了
- } else {
- ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- -- 再看下时间
- time -= System.currentTimeMillis() - currentTime;
- -- 如果时间依旧很充足,那就不停地尝试,等待,尝试等待,不停地循环
- } while(time > 0L);
-
- this.acquireFailed(waitTime, unit, threadId);
- var16 = false;
- return var16;
- }
- } finally {
- this.unsubscribe(subscribeFuture, threadId);
- }
这个巧妙的机制就是他利用了消息订阅,和信号量机制,导致他不是无休止的、盲目的等待或重试机制,(释放了,尝试,释放了,尝试),这样的话,我们的Redisson锁就说明它是支持尝试机制的,只要我们给了第一个参数tryLock(long waitTime, TimeUtil unit);来解决锁的重试
必须确保是业务执行完 锁的释放,而不是因为阻塞了而释放,
继续根据Redisson 的tryLock();空参数方法的 tryAcquire方法
- public RFuture<Boolean> tryLockAsync() {
- // 来了一个线程
- return this.tryLockAsync(Thread.currentThread().getId());
- }
- private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
- if (leaseTime != -1L) {
- return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
- } else {
-
- // 释放时间=-1 ,那么看门狗的默认时间=30
- RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
-
- // 当ttlRemainingFuture 完成返回以后,传进来剩余有效期 和 异常
- // 异常!=null 是抛异常了,是有问题了,就什么都不做
- ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
- if (e == null) {
- if (ttlRemaining) {
- // 自动调度 自动更新续约的一个功能
- this.scheduleExpirationRenewal(threadId);
- }
-
- }
- });
-
- return ttlRemainingFuture;
- }
- }
- private void scheduleExpirationRenewal(long threadId) {
- RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
-
- // 这里EXPIRATION_RENEWAL_MAP 是一个currentHashMap,是一个静态的,
- // 第一个参数就可以理解为key 是个字符串,可以理解为当前锁的名称,值就是entry
- // 是静态的,他的实例类,都可以看到map,调用的话,就都是不同的,一个锁对应一个entry
-
- // putIfAbsent,如果不存在,才往里面放,放进去的就一个全新entry,返回值就是null
- // 如果说不是一次来,重入的,第二次或第三次,那就不创建,返回值是第一次的entry,确保
- // 不管这把锁重入几次,将来拿到的都是同一个entry
- // 这个map的作用就是保证同一把锁,拿到的永远是同一个entry,
- RedissonLock.ExpirationEntry oldEntry =
- (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
-
- // 其实,不同的线程,是不可能拿到同一把锁的, 所以老的 的话,一定是同一个线程多次来
- // 其实就是一种重入,把线程ID加进去
- if (oldEntry != null) {
- // 不管是旧的,来了一个线程,就把这个线程加进来
- oldEntry.addThreadId(threadId);
-
- } else {
- // 还是新的,都会加进来
- entry.addThreadId(threadId);
- // 如果是第一次来,还会更新有效期
- this.renewExpiration();
- }
-
- }
-
-
-
-
-
-
- // 源码 就是同一个线程就是做了+1操作
-
- public synchronized void addThreadId(long threadId) {
- Integer counter = (Integer)this.threadIds.get(threadId);
- if (counter == null) {
- counter = 1;
- } else {
- counter = counter + 1;
- }
-
- this.threadIds.put(threadId, counter);
- }
如果是新的,更新有效期
- // 更新有效期
- private void renewExpiration() {
- // 首先从map里得到该entry
- RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
-
- if (ee != null) {
- // timeout 超时任务,或定时任务
- // .newTimeout(),有两个参数,1、任务本身,2、是延迟,3、时间单位,也就是说,这个任务会在时间单位到期以后才会执行,所以是一个延时的任务
- /* 这个延时任务过了 this.internalLockLeaseTime / 3L 才会执行 内部锁释放时间/3
- 因为我们没有传,默认是-1,而获取了看门狗的时间30秒
- 30/3=10,也就是10秒之后,开始执行
- */
- Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- public void run(Timeout timeout) throws Exception {
-
- // 10秒后拿出entry
- RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
- if (ent != null) {
-
- // 从entry里取出线程ID
- Long threadId = ent.getFirstThreadId();
-
- if (threadId != null) {
-
- // 紧接着,开始刷新有效期,里边是一个Lua脚本,如下图所示
- // 做了两件事,1、判断锁是不是我拿的,2、尝试去更新有效期expire
- // 该行代码就是重置当前线程锁的有效期
- RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
-
- // 执行完了以后执行
- future.onComplete((res, e) -> {
- if (e != null) {
- RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
- } else {
- if (res) {
-
- // 递归,再调自己,10秒后又重置,总结来说就是永不过期
- RedissonLock.this.renewExpiration();
- }
-
- }
- });
- }
- }
- }
- }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
-
- // 最后把任务放到entry里了,所以这个entry封装的是两个东西,
- // 第一个:是当前线程的id
- // 第二个:就是这个定时的任务
- // 所以,上边放旧的 的时候,就不用执行刷新有效期了,
- ee.setTimeout(task);
- }
- }
重置当前线程持有锁的有效期Lua
- // 重置有效期
- protected RFuture<Boolean> renewExpirationAsync(long threadId) {
- return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "
- if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
- redis.call('pexpire', KEYS[1], ARGV[1]);
- return 1;
- end;
- return 0;
- "
- , Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
- }
释放锁:unlock
- public void unlock() {
- try {
- // 异步执行unlock
- this.get(this.unlockAsync(Thread.currentThread().getId()));
- } catch (RedisException var2) {
- if (var2.getCause() instanceof IllegalMonitorStateException) {
- throw (IllegalMonitorStateException)var2.getCause();
- } else {
- throw var2;
- }
- }
- }
-
-
- public RFuture<Void> unlockAsync(long threadId) {
- RPromise<Void> result = new RedissonPromise();
-
- // 执行完后
- RFuture<Boolean> future = this.unlockInnerAsync(threadId);
- future.onComplete((opStatus, e) -> {
-
- // 执行取消任务
- this.cancelExpirationRenewal(threadId);
- if (e != null) {
- result.tryFailure(e);
- } else if (opStatus == null) {
- IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
- result.tryFailure(cause);
- } else {
- result.trySuccess((Object)null);
- }
- });
- return result;
- }
-
-
-
-
- -- 删除整个定时任务,删除后,锁的释放就完成了
- void cancelExpirationRenewal(Long threadId) {
- // 从map里获取 到 当前这把锁的任务
- RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
- if (task != null) {
- if (threadId != null) {
- // 删除 线程id
- task.removeThreadId(threadId);
- }
-
- if (threadId == null || task.hasNoThreads()) {
- // 取出 timeout任务
- Timeout timeout = task.getTimeout();
- if (timeout != null) {
- // 将任务删除
- timeout.cancel();
- }
-
- // 最后再把entry 取消掉即可
- EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
- }
-
- }
- }
获取锁流程:
释放锁的流程,右侧是释放锁,左侧流程图是获取锁
redisson分布式锁主从一致性问题
产生的原因:
redisson是如何解决主从一致性问题呢的?
搭建三台redis节点,形成一个联锁
修改Redisson配置,例如:如下三个独立的RedissClient,利用这三个分别获取独立的锁,将三个独立的锁联在一起,变成联锁
- @Configuration
- public class RedissonConfig {
-
- @Bean
- public RedissonClient redissonClient() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://localhost:6379");
- return Redisson.create(config);
- }
-
- @Bean
- public RedissonClient redissonClient2() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://localhost:6380");
- return Redisson.create(config);
- }
-
- @Bean
- public RedissonClient redissonClient3() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://localhost:6390");
- return Redisson.create(config);
- }
- }
新建单元测试,创建联锁:
- @Slf4j
- @SpringBootTest
- class RedissonTest {
-
- @Resource
- private RedissonClient redissonClient;
-
- @Resource
- private RedissonClient redissonClient2;
-
- @Resource
- private RedissonClient redissonClient3;
-
- private RLock lock;
-
- @BeforeEach
- void setUp() {
- RLock lock = redissonClient.getLock("order");
- RLock lock2 = redissonClient2.getLock("order");
- RLock lock3 = redissonClient3.getLock("order");
-
- // 创建联锁 multiLock
- RLock multiLock = redissonClient.getMultiLock(lock, lock2, lock3);
- }
点进去,进去发现底层代码是new的,也就是说,我去调的时候都是new的,不光redissonClient调可以,redissonClient2 和redissonClient3 调都是一样的,底层都是重新new的,传进来的参数是一个可变数组,即使将来我自己new 都没有问题
- public RLock getMultiLock(RLock... locks) {
- return new RedissonMultiLock(locks);
- }
再跟进去,可以看出来是将可变参数转成了集合,放到了成员变量里,也就是说多个独立的锁,都放到集合locks里了,按照联锁的原理,以后是会把这个集合里的锁都去尝试获取一遍,都成功了,才算成功,别的代码都不需要动即可
- final List<RLock> locks = new ArrayList();
-
- public RedissonMultiLock(RLock... locks) {
- if (locks.length == 0) {
- throw new IllegalArgumentException("Lock objects are not defined");
- } else {
- this.locks.addAll(Arrays.asList(locks));
- }
- }
打断点可知,所谓的联锁,就是多个独立的锁,而每个独立的锁和之前的原理是一样的,包括获取 技术+1 -1,删除锁,原理都一样
联锁的tryLock:底层代码分析
- boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
-
- // 应为我们没传释放时间,默认是-1
- public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
- return this.tryLock(waitTime, -1L, unit);
- }
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
-
- /*
- 这一段,我们只有传了过期时间才会走
- 对于释放时间的处理
- */
- long newLeaseTime = -1L;
- if (leaseTime != -1L) {
-
- // 如果是-1 说明只想获取一次,也就是说不重试,释放时间是多久那就是多久
- if (waitTime == -1L) {
- newLeaseTime = unit.toMillis(leaseTime);
- } else {
-
- // 说明锁获取失败后想要重试,就不会我们传进来的释放时间了,会把等待时间*2
- // 重试可能耗时比较久,万一重试时间小于等待时间,还没重试完呢,就释放了,就有问题了
- newLeaseTime = unit.toMillis(waitTime) * 2L;
- }
- }
-
- // 当前时间
- long time = System.currentTimeMillis();
- // 剩余时间,初始化-1
- long remainTime = -1L;
- if (waitTime != -1L) {
- // 如果传了等待时间,那就回做替换操作,也就是说remainTime就是等待时间
- remainTime = unit.toMillis(waitTime);
- }
-
- // 计算锁的等待时间,锁等待时间=剩余等待时间
- long lockWaitTime = this.calcLockWaitTime(remainTime);
-
- // 失败的锁的限制,是0
- int failedLocksLimit = this.failedLocksLimit();
-
- // 获取成功的锁集合,while 循环都执行完成功的,就拿到了所有的锁
- List<RLock> acquiredLocks = new ArrayList(this.locks.size());
-
- // 遍历我们的三个独立的锁,因为我们前边只创建了三个Redis节点
- ListIterator iterator = this.locks.listIterator();
-
- while(iterator.hasNext()) {
- // 先拿到锁
- RLock lock = (RLock)iterator.next();
-
- // 代表获取锁成功,或失败
- boolean lockAcquired;
- try {
-
- // tryLock为空参数的情况,只一次,不重试,尝试是否获取到锁
- if (waitTime == -1L && leaseTime == -1L) {
- lockAcquired = lock.tryLock();
- } else {
-
- // 非空参数的情况,带有重试的,尝试是否获取到锁
- long awaitTime = Math.min(lockWaitTime, remainTime);
- lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
- }
- } catch (RedisResponseTimeoutException var21) {
- this.unlockInner(Arrays.asList(lock));
- lockAcquired = false;
- } catch (Exception var22) {
- lockAcquired = false;
- }
-
- // 去判断获取锁有没有成功
- if (lockAcquired) {
-
- // 成功的话就把锁放到 已经成功的锁的集合里
- acquiredLocks.add(lock);
-
- // 获取锁失败,没有获取到锁
- } else {
-
- // 锁的总数 - 已经获取的锁的数量 == 0,5-2==3? false
- if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
- // 已经获取的锁的数量=锁的总数,才会=0 ,才能跳出循环,只有把锁都拿到了才会结束
- break;
- }
-
- // 是0
- if (failedLocksLimit == 0) {
- this.unlockInner(acquiredLocks);
-
- // 如果是-1 证明不想做重试,那就一次失败,直接失败,返回false
- if (waitTime == -1L) {
- return false;
- }
-
- // 如果想重试
- failedLocksLimit = this.failedLocksLimit();
-
- // 先把已经拿到的锁清空,即使一把锁都没拿到
- acquiredLocks.clear();
-
- // 把迭代器往前迭代,就是把指针指向第一个,因为要重试,重头再来,从第一把锁开始
- while(iterator.hasPrevious()) {
-
- // 循环往前迭代,循环完最终指针就指向了第一个元素,然后结束,又开始新一轮for循环(这里是最外层的while循环),也就是说,要么把所有的锁都拿到,要么失败了重试,直到等待时间小于0,超时为止!,这就是整个获取锁的流程
- iterator.previous();
- }
- } else {
- --failedLocksLimit;
- }
- }
-
- // 判断剩余等待时间是否是-1,如果不是,说明剩余时间很充足
- if (remainTime != -1L) {
-
- // 得到现在剩余时间
- remainTime -= System.currentTimeMillis() - time;
- time = System.currentTimeMillis();
-
- // 如果剩余时间小于等于0,说明刚才获取锁,已经把等待时间给耗尽了,代表获取锁超时了,只能false失败,但是,在false 会先去把已经获取到的这些锁释放掉(集合),因为已经失败了,前边拿到的锁就不能再拿了,拿了别人就拿不到了,干脆就释放掉了
- if (remainTime <= 0L) {
- this.unlockInner(acquiredLocks);
- return false;
- }
- }
- // 如果时间还很充足,那就开始下一次循环了,正好while循环结束,开始下一次循环
- }
-
-
- // leaseTime!=-1 在结束前才会执行这段逻辑。锁的释放时间,我们之前是没有传的
- // leaseTime ==-1 不传就会触发看门狗机制,默认30,而且会自动去续期,所以不需要这里去处理
- // 如果传了就没了,需要自己指定锁的释放时间
- // 推荐释放时间不传,不要设置,用看门狗的就可以了
- if (leaseTime != -1L) {
-
- // 传了的逻辑
- List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
- Iterator var24 = acquiredLocks.iterator();
-
- // 遍历拿到的每一把锁
- while(var24.hasNext()) {
- RLock rLock = (RLock)var24.next();
-
- // 会执行expireAsync expire命令,设置有效期,也就是说会给没把所都重新设置一下有效期,因为我们获取锁的时候会有多个redis节点依次获取,第一把锁在获取之后,立即就开始倒计时了,而最后一把锁再获取后 会刚开始倒计时,也就是说,到这个循环的时候,第一把锁的剩余有效期,一定会比最后一把锁的剩余有效期的时间要短一些,这样就有可能出现一些问题,有些锁释放了,有些没有释放的情况,避免这个问题的发生,这行代码的意思就是,等所有锁都拿到了,给所有的锁都重新设置一下有效期,确保大家的有效期是一样的
- RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
- futures.add(future);
- }
-
- var24 = futures.iterator();
-
- while(var24.hasNext()) {
- RFuture<Boolean> rFuture = (RFuture)var24.next();
- rFuture.syncUninterruptibly();
- }
- }
-
- // 成功了 返回true
- return true;
- }
总结:
客户端---通过nginx---负载均衡到tomact---tomact内部大量的请求(串行执行)下图箭头这四步都会直接打到DB,而数据库本身的并发能力就差,而且还有写操作,避免分布式问题,我们还加了锁,所以相对来说整个业务的耗时就较长,虽然也是毫秒级别的
利用Redis完成对秒杀库存的判断,和一人一单的判断
优惠券库存判断:利用String
一人一单功能:需要记录当前业务的优惠券,被那些用户购买过,以后再有用户来的时候,只需要判断是否存在,存在证明购买过,那就不能在买了
业务逻辑:0 是有购买资格,1、2没有购买资格,业务较长,要确保原子性(利用Lua命令)
利用Lua脚本来实现
如果是0 才会有购买资格,放入队列里,然后开启一个新的独立线程,来读取提前保存好客户的信息,就可以异步的来完成数据库的写的操作了;
在返回订单id给用户的那一刻,其实秒杀业务已经结束了,用户拿到了id 就可以去付款了,至于我们什么时候完成下单信息,将优惠券相关信息写到数据库的操作,时效性上已经不那么高了。
正是因为我们将同步的写数据操作,变成了异步操作,
需求:
VoucherServiceImpl类,添加优惠券信息,这个库存可以永久保存到Redis当中,将来移除秒杀的时候,只需要手动删除即可
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
-
-
- @Override
- @Transactional
- public void addSeckillVoucher(Voucher voucher) {
- // 保存优惠券
- save(voucher);
- // 保存秒杀信息
- SeckillVoucher seckillVoucher = new SeckillVoucher();
- seckillVoucher.setVoucherId(voucher.getId());
- seckillVoucher.setStock(voucher.getStock());
- seckillVoucher.setBeginTime(voucher.getBeginTime());
- seckillVoucher.setEndTime(voucher.getEndTime());
- seckillVoucherService.save(seckillVoucher);
-
- // 1、新增秒杀优惠券的同时,将优惠券信息保存到Redis中,利用String结构,这里不需要配置剩余有效期,如果将来我们不需要了,再手动将它删除
- stringRedisTemplate.opsForValue().
- set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
- }
-
-
-
-
- 工具类定义常量
- public static final String SECKILL_STOCK_KEY = "seckill:stock:";
然后利用postman去发起请求添加秒杀券即可,报文利用之前的即可
- -- 1.参数列表
- -- 1.1.优惠券id
- local voucherId = ARGV[1]
- -- 1.2.用户id
- local userId = ARGV[2]
-
- -- 2.数据key
- -- 2.1.库存key 注意:lua脚本里 字符串拼接是用.. 而不是用+ 加号了
- local stockKey = 'seckill:stock:' .. voucherId
- -- 2.2.订单key
- local orderKey = 'seckill:order:' .. voucherId
-
- -- 业务1 判断库存是否充足
- if (tonumber(redis.call('get', stockKey) <= 0)) then
- return 1;
- end
-
- -- 业务2 判断用户是否下单,等于1存在,等于0不存在
- if (redis.call('sismember', orderKey, userId) == 1) then
- return 2;
- end
-
- -- 业务3 说明库存充足且用户没有下单
- -- 3.1 扣库存 利用incrby -1 修改库存
- redis.call('incrby', stockKey, -1)
-
- -- 3.2 下单保存用户 用sadd key faild
- redis.call('sadd', orderKey, userId)
-
- -- 成功返回0
- return 0
脚本写完后需要在java代码里去执行脚本
VoucherOrderServiceImpl类修改实现方法seckillVoucher
1、引入lua脚本
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
2、修改代码
- public Result seckillVoucher(Long voucherId) {
- // 获取用户id
- Long userId = UserHolder.getUser().getId();
- // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null 或"",它是一个空集合
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString()
- );
- // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
- int r = result.intValue();
- if (r != 0) {
- // 2.1.不为0 ,代表没有购买资格
- Result.fail(r == 1 ? "库存不足" :"客户已下单");
- }
- // 2.2 为0 将任务放到阻塞队列里
-
- long orderId = redisIdWorker.nextId("order");
- // 3.返回订单id
- return Result.ok(orderId);
- }
修改代码
- public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
-
- @Resource
- private ISeckillVoucherService seckillVoucherService;
-
- @Resource
- private RedisIdWorker redisIdWorker;
- @Resource
- private RedissonClient redissonClient;
- @Resource
- private StringRedisTemplate stringRedisTemplate;
-
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
-
-
- // 定义一个线程池,单线程的
- private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
-
- // 定义一个阻塞队列,范型就是封装的类型,参数是队列的大小
- private static final BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
-
-
-
- // 客户随时都在下单,这个类一加载完成后,就应该开始从阻塞队列里取数据了,利用springBoot的一个注解@PostConstruct注解来实现
- @PostConstruct // 这个注解的意思就是,下边的这个方法在当前类初始化完成后就开始执行
- private void init() {
- SECKILL_ORDER_EXECUTOR.submit(new handlerVoucherOrder());
- }
-
- // 定义一个线程任务,什么时候去执行?肯定是在用户秒杀抢购之前开始,因为用户一旦秒杀,任务就应该去从阻塞队列里取出订单信息
- private class handlerVoucherOrder implements Runnable{
-
- @Override
- public void run() {
- // 不断的从队列里取,定义一个死循环
- while (true) {
- // 从队列里获取订单信息,尽管while(true),但是,他是队列,有才取,没有就等待
- try {
- VoucherOrder voucherOrder = orderTask.take();
- // 创建订单
-
- proxy.createVoucherOrder(voucherOrder);
- } catch (Exception e) {
- log.info("订单创建异常", e);
- }
- }
- }
- }
-
- @Transactional
- public void createVoucherOrder(VoucherOrder voucherOrder) {
- // 注意,这里不不能用UserHolder来取,线程变了,会取不到的
- Long userId = voucherOrder.getUserId();
- Long voucherId = voucherOrder.getVoucherId();
-
- // 创建锁对象,其实这里获取锁,是一个兜底方案,也可以不用的,算是兜底防止redis崩了
- RLock redisLock = redissonClient.getLock("lock:order:" + userId);
-
- // 尝试获取锁
- boolean isLock = redisLock.tryLock();
-
- // 判断
- if (!isLock) {
- // 获取锁失败,直接返回失败或者重试,没必要返回了,直接是后段操作,不用返回前段,没意义
- log.error("不允许重复下单!");
- return;
- }
-
- // 以下代码是操作数据库的,前边我们只操作了redis缓存里的,这里是数组异步操作
- try {
- // 5.1.查询订单
- int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
- // 5.2.判断是否存在
- if (count > 0) {
- // 用户已经购买过了
- log.error("不允许重复下单!");
- return;
- }
-
- // 6.扣减库存
- boolean success = seckillVoucherService.update()
- .setSql("stock = stock - 1") // set stock = stock - 1
- .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
- .update();
- if (!success) {
- // 扣减失败
- log.error("库存不足!");
- return;
- }
-
- // 这里直接save即可
- save(voucherOrder);
- } finally {
- // 释放锁
- redisLock.unlock();
- }
- }
-
- // 代理定义成一个成员变量
- private IVoucherOrderService proxy = null;
- @Override
- public Result seckillVoucher(Long voucherId) {
- // 获取用户id
- Long userId = UserHolder.getUser().getId();
- // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null 或"",它是一个空集合
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString()
- );
- // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
- int r = result.intValue();
- if (r != 0) {
- // 2.1.不为0 ,代表没有购买资格
- return Result.fail(r == 1 ? "库存不足" :"客户已下单");
- }
-
- // 2.2 为0 将任务放到阻塞队列里,将订单id 用户id 代金券id 进行封装
- VoucherOrder voucherOrder = new VoucherOrder();
- // 2.3.订单id
- long orderId = redisIdWorker.nextId("order");
- voucherOrder.setId(orderId);
- // 2.4.用户id
- voucherOrder.setUserId(userId);
- // 2.5.代金券id
- voucherOrder.setVoucherId(voucherId);
-
- // 2.6.放入阻塞队列
- orderTask.add(voucherOrder);
- /*
- 2.7 添加代理对象,只有在主线程 才能实现代理,子线程是不可以的,因为它的底层也是ThreadLocal 去获取的,所以将代理对象
- 放在主线程里,要么当作参数传,要么写到成员变量
- 这里我们当作成员变量,放到主线程里去提前获取
- */
- proxy = (IVoucherOrderService) AopContext.currentProxy();
-
- // 3.返回订单id
- return Result.ok(orderId);
- }
但是,基于JVM阻塞队列实现秒杀会有两个问题,
内存限制:
数据安全问题:
消息队列可以理解为:快递员-快递柜-我
快递员送快递送多个人,将快递放到快递柜,每个人去拿自己,完成
如果说快递员送快递,快递柜没位置了,我又在公司上班,当我回家了,快递员又不上班,放在门口又怕丢,两边难受,而有了快递柜,快递员只需要将快递放在快递柜里,只要发个短信,我们什么时候有时间就去拿就好了,这样就解除了快递员和我们的耦合
为什么用消息队列?
市面上的消息队列,像阿里的消息队列、kafka等,搭建消息队列还是费点劲的
Redis有自己的消息队列
首先list是在内存之外的,而且redis是支持持久化的,数据安全性有保障
但是无法避免消息丢失,如果取出一个数据,list的pop是取出并且移除了,如果服务挂了,那就丢失了,其他消费者也拿不到,而且他还是单消费者,无法实现一条消息被多消费者消费的,有的业务需要多消费者消费的
psubscribe 是有通配符的,如:?是一个自负,*是任意,[ ad ] 制定字符
如果生产者发送的是:publist order.queue1,那么只有第二个消费可以拿到,第一个消费者是拿不到的,因为第二个消费者拿的是通配符的,第一个是固定的
redis-server 启动redis
redis-cli:开始redis客户端口,可以开启多个,如下,输入order.q2的时候只有一个窗口可以收到
缺点:不支持持久化,list支持是因为list本身就不是一个消息队列,是一个链表,用来做数据存储的,只不过我们把它当作消息队列来用了,Redis用来做数据存储的 都是支持数据持久化的。
而pubSub本身设计出来就是用来做消息发送的,如果说消息发送了,而这个频道没有被任何人订阅,那么这个频道就丢失了,也就是说我们发送的所有消息不会在redis里保存,如果没人收,直接就没了,不要了,这就是最大的问题。
不支持持久化,安全性没有保障
而且消息堆积有上线,虽然发送的消息不在内存中保存,发送的消息如果有消费者监听,会在消费者那里有一个缓存区域,把这个消息缓存下来,接下来消费者去处理,如果消费者处理的比较慢,处理一条消息需要1秒,如果说此时又来了10几条消息,那么这些消息都会缓存在客户端那里,就是消费者那里,而消费者那里的缓存空间是有上限的,如果超出了就会丢失
同一个消息,两个消费者都可以读取,说明不是单消费者的,而且在stream里,消息是永久存在的,读完之后是不会删除的,这里是从第一条消息,下标0开始的
如果想读最新消息那就用$,下边返回nil 是因为第一条消息已经读取过了,没有最新消息
如果想加上阻塞读取,就是有消息就读取,没消息就等待,那就加上bluck 后边跟一个阻塞毫秒时间,0是永久等待,如下,阻塞等待中,直到2发送了一条消息,1才读取到
阻塞读取了
基于stram实现阻塞队列,java伪代码,利用死循环来实现阻塞取,等待,无限尝试
注意当指定$时候,代表我们拿到的是最新消息,就是最后一条消息,可能会出现漏读消息的情况,比如当我们处理消息的时候,此时又来了7、8条数据,那么它只会读取最后一条数据 ,这样就出现漏读的情况了
总结:
消息可回溯:就是消息永久存在,不会丢失
将多个消费者分到一个组中,监听同一队列
一般情况下并不需要我们手动添加消费者,因为当我们从这个组当中指定一个消费者并且监听消息的时候,如果它发现这个消费者不存在,就是自动帮我们创建
案例:注意:一个组内,只有一个标记,所以消费者2 和消费者1 每次拿到的都不一样
虽然读了这么多消息,但是都没有确认过,利用XACK命令进行确认
xack key group id[id...],执行后确认,这种是正常情况 key队列名称,group组的名称
异常情况:刚拿到一条消息,处理的时候,挂嘞,这条消息没有确认ack,消费者只要拿到消息,一定会进去pending-list里,那么如何查看pending-list呢
查看pending-list里的消息
命令:【】获取消息以后,确认之前的空闲时间 ,比如获取5000毫秒以上的消息,可以不给,即所有的我都要
start end pending-list里的 起始和结束的位置如:- + 最小到最大
查询pending-list的消息,读取pending-list里的第一条消息
这就又一次拿到了k6了,可以再次去处理嘞
然后去处理,确认ack,之后再去pending-list里拿,就没有了,这样就将正常消息,和异常消息全都处理嘞
伪代码:所有消息处理都必须成功,实在不成功那就人工去处理,catch的代码,又空判断,是因为如果pending-list处理完确认后,会再次返回,然后没有了的话不就返回null了,然后就可以break了,确保消息一定被执行
缺点,Redis的持久化,也不是万无一失的,也有丢失风险的,而且stream,只支持消费者的确认机制,是不支持生产者的确认机制的,建议还是用专业的消息队列如阿里的,还有消费的事务机制等,其实一般来说,stream已经可以满足中小型企业的需要了
xgroup create stream.orders g1 0 mkstream 队列不存在时自动创建队列 stream.orders 是队列,g1 是组; 因为是新创建的队列,id直接 为0即可
- -- 1.3.订单id
- local orderId = ARGV[3]
保存用户后立刻将数据发送到消息队列里,用stream xadd
-
- -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
- redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
修改VoucherOrderServiceImpl类
- // 代理定义成一个成员变量
- private IVoucherOrderService proxy = null;
- @Override
- public Result seckillVoucher(Long voucherId) {
- // 获取用户id
- Long userId = UserHolder.getUser().getId();
-
- // 2.3.订单id
- long orderId = redisIdWorker.nextId("order");
-
- // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null 或"",它是一个空集合
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString(), String.valueOf(orderId)
- );
- // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
- int r = result.intValue();
- if (r != 0) {
- // 2.1.不为0 ,代表没有购买资格
- return Result.fail(r == 1 ? "库存不足" :"客户已下单");
- }
-
- // 如下代码就省略了,不用放到阻塞队列里嘞,直接利用lua脚本放到消息队列里
- /*
- // 2.2 为0 将任务放到阻塞队列里,将订单id 用户id 代金券id 进行封装
- VoucherOrder voucherOrder = new VoucherOrder();
- voucherOrder.setId(orderId);
- // 2.4.用户id
- voucherOrder.setUserId(userId);
- // 2.5.代金券id
- voucherOrder.setVoucherId(voucherId);
- // 2.6.放入阻塞队列
- orderTask.add(voucherOrder);*/
-
- /*
- 2.7 添加代理对象,只有在主线程 才能实现代理,子线程是不可以的,因为它的底层也是ThreadLocal 去获取的,所以将代理对象
- 放在主线程里,要么当作参数传,要么写到成员变量
- 这里我们当作成员变量,放到主线程里去提前获取
- */
- proxy = (IVoucherOrderService) AopContext.currentProxy();
-
- // 3.返回订单id
- return Result.ok(orderId);
- }
阻塞队列就不需要了,修改阻塞队列这一块的逻辑,如下的代码,全都注释,不要了
- // 定义一个阻塞队列,范型就是封装的类型,参数是队列的大小
- private static final BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
- // 定义一个线程任务,什么时候去执行?肯定是在用户秒杀抢购之前开始,因为用户一旦秒杀,任务就应该去从阻塞队列里取出订单信息
- private class handlerVoucherOrder implements Runnable{
-
- @Override
- public void run() {
- // 不断的从队列里取,定义一个死循环
- while (true) {
- // 从队列里获取订单信息,尽管while(true),但是,他是队列,有才取,没有就等待
- try {
- VoucherOrder voucherOrder = orderTask.take();
- // 创建订单
-
- proxy.createVoucherOrder(voucherOrder);
- } catch (Exception e) {
- log.info("订单创建异常", e);
- }
- }
- }
- }
新增加一个线程任务
- private class handlerVoucherOrder implements Runnable{
- String queueName = "stream.orders";// 和lua脚本的名字一定不要写错,写错就报错了
- @Override
- public void run() {
- while (true) {
- try {
- // 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.orders >
- // consumer 消费者的参数; options选项 条数,阻塞时间; streamOffset偏移量 指定队列名称 和 id
- // 返回list 集合,可能是读取1条,有可能是多条,count
- List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
- StreamOffset.create(queueName, ReadOffset.lastConsumed())
- );
- // 判断消息是否为空
- if (list == null || list.isEmpty()) {
- // 如果失败,说明没有消息,继续下一次循环
- continue;
- }
- // 如果成功,可以下单
- // 解析消息中的订单list, string 是id 后边的两个object 是键值对,就是我们lua脚本里的 插入消息的键值对
- MapRecord<String, Object, Object> record = list.get(0);
- Map<Object, Object> value = record.getValue();
- // map 转对象
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
- createVoucherOrder(voucherOrder);
- // ack 确认 SACK stream.orders g1 id
- stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
- } catch (Exception e) {
- handlePendingList();
- log.info("订单创建异常", e);
- }
- }
- }
-
- private void handlePendingList() {
- while (true) {
- try {
- // 获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams streams.orders 0
- // consumer 消费者的参数; options选项 条数,阻塞时间; streamOffset偏移量 指定队列名称 和 id
- // 返回list 集合,可能是读取1条,有可能是多条,count
- List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
- Consumer.from("g1", "c1"),
- StreamReadOptions.empty().count(1),
- StreamOffset.create(queueName, ReadOffset.from("0")) // 没有0的常量,可以自己传
- );
- // 判断消息是否为空
- if (list == null || list.isEmpty()) {
- // 如果失败,说明没有消息,结束循环
- break;
- }
- // 如果成功,可以下单
- // 解析pending-list的订单list, string 是id 后边的两个object 是键值对,就是我们lua脚本里的 插入消息的键值对
- MapRecord<String, Object, Object> record = list.get(0);
- Map<Object, Object> value = record.getValue();
- // map 转对象
- VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
- createVoucherOrder(voucherOrder);
- // ack 确认 SACK stream.orders g1 id
- stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
- } catch (Exception e) {
- log.info("订单创建异常", e);
- try {
- Thread.sleep(50);
- } catch (InterruptedException interruptedException) {
- interruptedException.printStackTrace();
- }
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。