当前位置:   article > 正文

Redis5、全局ID、乐观锁修改,被关锁新增、事务失效、redisson工具类、redis消息队列,Ctrl+P:查看方法参数Ctrl+Q:查看类.方法.属性注释_redisson 工具类

redisson 工具类

一、全局ID生成器 

每个店铺都可以发布优惠券

当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增ID就存在一些问题:

id自增的问题:

  • id的规律性太明显:今天下单id=1,明天下单id=100,就暴露了销售额信息等
  • 受单表数据量的限制:数据量大的时候,单表不能保存如此多的数据。如果分表的话,每张表就会计算自己的自增长,ID就会出现重复,而订单是唯一的(违背了唯一性)

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般满足下列特性:

  • 唯一性
  • 高可用:基本不会down机
  • 高性能:速度快
  • 递增性:变大的
  • 安全性

利用Redis的incr命令,为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息---数值类型,java里的Long类型,8个字节共64位

ID的组成部分:(时间相同的情况下,序列号不一样)

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit秒内的计数器,支持每秒产生2的32次方的不同ID

代码实现

utils包下定义一个类 RedisIdWorker (基于Redis的Id生成器

注意:该自增是利用Redis的自增方法,我们位移后获取的是二进制因为我们左移了,32+32,二进制的低32位就是我们的count,高32位就是时间戳,如果高并发下时间戳一样,看起来就是自增的,如:

  • 97354934231498754、2022-09-20 08:26:52
  • 97354934231498755、2022-09-20 08:26:52
  • 这两个id的创建时间都是一样的,只不过序列号不一样,序列号利用Redis自增,如果时间戳不一样,那十进制看起来区别就比较大了,如:97354960001302534,正式这种方式,更好

如:

  • count=6
  • 时间戳+count左移位前=22667218 6
  • 时间戳+count左移位后的二进制:
    • 0000 0001 0101 1001 1101 1111 1101 0010 0000 0000 0000 0000 0000 0000 0000 0110
  • 移位后的二进制转十进制(整体二进制转十进制)
    • id=97354960001302534
  1. @Component
  2. @Slf4j
  3. public class RedisIdWorker {
  4. /**
  5. * 开始时间戳
  6. */
  7. private static final long BEGIN_TIMESTAMP = 1640995200L;
  8. /**
  9. * 序列号位数
  10. */
  11. private static final int COUNT_BITS = 32;
  12. private StringRedisTemplate stringRedisTemplate;
  13. public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
  14. this.stringRedisTemplate = stringRedisTemplate;
  15. }
  16. /**
  17. * 生成全局ID
  18. */
  19. public long nextId(String keyPrefix) {
  20. /**
  21. * 生成时间戳
  22. * 31位的数字,单位秒,他的值是要有一个 基础的时间 作为开始时间
  23. * 时间戳(秒数) = 当前时间 - 基础时间,
  24. * 即:从开始时间 隔了多少秒
  25. */
  26. LocalDateTime now = LocalDateTime.now();
  27. long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
  28. // 当前时间 - 基础时间 = 时间戳
  29. long timestamp = nowSecond - BEGIN_TIMESTAMP;
  30. /**
  31. * 生成序列号
  32. * 利用Redis的自增长 用字符串结构 默认一次自增 1
  33. * 不同的业务有不同的key
  34. * stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":");
  35. * 这样不可以:
  36. * 1、这样的话默认就是整个订单业务都是一个key,不管过了多少年,随着业务的订单越来越多
  37. * ,而redis 单个key的自增长是有上限的,是264次方,虽然大也是有上限的
  38. * 2、而且key里边真正用来记录序列号的,只有32bit位,如果说将来key,超过了32bit,那就存不下了。
  39. * 所以,哪怕是同一个业务,也不能使用同一个key
  40. * 解决:
  41. * 我们可以在后边拼一个当期日期 比如20220910,到了第二号,就是一个新的key20220911
  42. * 这样就还有一个统计的效果
  43. */
  44. // 获取当前日期,自定义格式化, 这样就还可以统计 年 月 日的单
  45. String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
  46. // 这里用基本类型,后面要做运算,如果key不存在,会自动创建的,不会有空指针
  47. // 注意,插到库里这个key 是一个,value就是count 回覆盖之前
  48. long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
  49. log.info("count={}", count);
  50. /**
  51. * 拼接并返回,利用位运算,
  52. * 前面说了就当做long类型8个字节,
  53. * 全局id = 0 + 时间戳 + 序列号
  54. * 将时间的戳的值左移32位,然后空出来的32位,
  55. * 利用 | 运算去将序列号填充即可
  56. *
  57. * 或运算| 一个为真即为真,现在后面的32位都是0
  58. *count的值 可能为0 可能为1,我们希望不管为0还是1都需要填充到后32
  59. * 0|0 = 0
  60. * 0|1 = 1
  61. * 即:count值将来是什么,后32位就保留什么了
  62. */
  63. return timestamp << COUNT_BITS | count;
  64. }
  65. /**
  66. * 计算基础(当前)时间秒数
  67. */
  68. public static void main(String[] args) {
  69. // of 方法可以指定年月日
  70. LocalDateTime now = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
  71. // 接收时期
  72. long second = now.toEpochSecond(ZoneOffset.UTC);
  73. System.out.println(second);
  74. }
  75. }

注意:每天一个key,方便我们去统计每天的订单量

总结

全局唯一ID生成策略:

  • UUID,是一长串16进制的,没有自增长无规律,是字符串
  • Redis自增,有规律的,例如我们的案例,是数字类型,long类型的64位数字
  • snowflak(雪花算法,也是long类型的64位数字,性能理论上来讲比Redis好,但依赖于时钟,)
  • 数据库自增,单独整一张表,N张表用的是同一张表的自增ID

Redis自增ID策略:(Redis保存key需要注意)

  • 每天一个key,方便统计订单量(每天的,月的,年的)、限定key自增的值不会让key太大,以至于超过自增的上限
  • ID结构是:时间戳+计数器

测试类:

  1. private ExecutorService es = Executors.newFixedThreadPool(500);
  2. @Test
  3. void testIdWorker() throws InterruptedException {
  4. CountDownLatch latch = new CountDownLatch(300);
  5. Runnable task = () -> {
  6. for (int i = 0; i < 100; i++) {
  7. long id = redisIdWorker.nextId("order");
  8. System.out.println("id = " + id);
  9. }
  10. latch.countDown();
  11. };
  12. long begin = System.currentTimeMillis();
  13. for (int i = 0; i < 300; i++) {
  14. es.submit(task);
  15. }
  16. latch.await();
  17. long end = System.currentTimeMillis();
  18. System.out.println("time = " + (end - begin));
  19. }

二、实现优惠券的秒杀下单

实现优惠券秒杀下单

在VoucherController中提供了一个接口,可以添加秒杀优惠券;

http://localhost:8081/voucher/seckill

添加秒杀券也是优惠券,只不过在t_voucher实体类里已经把秒杀券的信息拿到了都,

  1. {
  2. "shopId":1,
  3. "title":"100元代金券",
  4. "subTitle":"周一至周五均可使用",
  5. "rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
  6. "payValue":8000,
  7. "actualValue":10000,
  8. "type":1,
  9. "stock":100
  10. }

 2、实现下单接口,完成抢购功能

实现优惠券秒杀的下单功能:

下单时需要判断两点

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

 基本功能实现:

  1. @Resource
  2. private ISeckillVoucherService seckillVoucherService; // 秒杀券的业务层
  3. @Resource
  4. private IVoucherOrderService voucherOrderService; // 订单业务层
  5. @Resource
  6. private RedisIdWorker redisIdWorker; // 全局区域ID工具类
  7. /**
  8. * 当有两张表以上的操作时,要加上事务,出现问题会及时回滚,
  9. * 否则无法回滚的
  10. * @param voucherId
  11. * @return
  12. */
  13. @Override
  14. @Transactional
  15. public Result seckillVoucher(Long voucherId) {
  16. // 1、查询优惠券
  17. SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
  18. // 2、判断秒杀是否开始
  19. if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
  20. return Result.fail("秒杀活动尚未开始");
  21. }
  22. // 3、判断秒杀是否结束 结束时间在当前时间值前
  23. if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
  24. return Result.fail("秒杀活动已经结束了");
  25. }
  26. // 4、判断库存是否充足
  27. if (voucher.getStock()<1) {
  28. return Result.fail("库存不足");
  29. }
  30. // 5、扣减库存
  31. boolean isUpdate = seckillVoucherService.update().setSql("stock = stock-1").eq("voucher_id", voucherId).update();
  32. if (!isUpdate) {
  33. return Result.fail("库存不足");
  34. }
  35. // 6、创建订单
  36. VoucherOrder voucherOrder = new VoucherOrder();
  37. // 6.1 订单id
  38. long orderId = redisIdWorker.nextId("order");
  39. voucherOrder.setId(orderId);
  40. // 6.2 用户id
  41. Long userId = UserHolder.getUser().getId();
  42. voucherOrder.setUserId(userId);
  43. // 6.3 优惠券id
  44. voucherOrder.setVoucherId(voucherId);
  45. // 6.4 保存订单信息
  46. voucherOrderService.save(voucherOrder);
  47. // 7、返回订单 id
  48. return Result.ok(orderId);
  49. }

以上代码有超卖问题

三、库存超卖问题

利用JMeter模拟高并发

/voucher-order/seckill/10

注意:这里我们需要在JMeter里加一个请求头,即:用户token的请求头,否则拦截器过不去,redis里找的:authorization:6a44cf0d7b564030906ad8ed9577285c

模拟结果,order表出现109条数据,库存是-9,

这样就出现了问题,我们要卖100个券,实际却卖了109张券

出现抢占资源的(并发安全)问题

 加锁:悲观锁&乐观锁,这两个所只是一种理念

悲观锁:还有像数据库的互斥锁也是悲观锁

乐观锁:比如我查DB的优惠券库存,要更新了,更新之前我去判断一下,有没有别人修改库存,

乐观锁方式:

  • 1、版本号机制
    • 修改前判断版本号有无变化,where条件 如果查到了,那么就表示没人操作,我就可以修改,where条件如果没有查到,就说明有人操作了,重试或异常
    • 说白了,版本号法是用版本来标识数据有没有变化,我们在第一步查到的版本,和更新时的版本一致,证明就没有人更新

  • 2、CAS(比较 and set)机制
    • 在版本号的基础上做了一些简化,我们这个业务,查数据的时候,库存是要查的,更新的时候,库存也要更新,库存和版本所做的事是一样的,so可以用库存来代替版本,实现方式同理
    • 即用数据本身有无变化(库存)去判断线程是否安全

修改代码:然后继续执行JMeter,我们发现并不好用,库存只卖出了21件,订单也是21个

  1. /**
  2. * 扣减库存
  3. * 修改代码,添加乐观锁,CAS机制
  4. * 添加where条件让stock等于我们上边查到的stock值
  5. * 如果查到了,说明没人修改,如果没查到说明有人修改了
  6. * 光加这一个的话,库存并没有到0,数据库还有79
  7. */
  8. boolean isUpdate = seckillVoucherService.update()
  9. .setSql("stock=stock-1")
  10. .eq("voucher_id", voucherId)
  11. .eq("stock", seckillVoucher.getStock())
  12. .update();

券还没卖完,就返回没有库存了,这就是设计乐观锁的一个弊端,当一个线程修改成功了,其余线程也在执行,发现没有查到,就修改失败了,这就浪费了一次线程任务,虽然修改失败了,但是并没有线程安全问题,我们需要做处理,不用不等于就报错,可以直接让stock只要大于0就好

  1. / * 我们修改一下,只要stock>0即可,不一定要必须等于,ge大于的意思
  2. */
  3. boolean isUpdate = seckillVoucherService.update()
  4. .setSql("stock=stock-1")
  5. .eq("voucher_id", voucherId)
  6. .gt("stock", 0)
  7. .update();

四、一人一单(同一个用户只能下一单)

解决办法就在tb_voucher_order里userId和VoucherId 联合查询,就能确定是同一个用户

解决思路:

 代码片段:

  1. Long userId = UserHolder.getUser().getId();
  2. /**
  3. * 新添加一人一单
  4. */
  5. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  6. if (count>0) {
  7. return Result.fail("用户已经买过了"+count+"次!");
  8. }
  9. /**
  10. * 以下的就继续走
  11. */
  12. boolean isFlag = seckillVoucherService.update().
  13. setSql("stock = stock-1").
  14. eq("voucher_id", seckillVoucher.getVoucherId())
  15. .gt("stock", 0)
  16. .update();
  17. if (!isFlag){
  18. return Result.fail("库存不足");
  19. }

跑批后发现数据库数据没有对上,200个线程,同一用户,却跑出了10个订单,以前是一人下100单,现在是一人下了10单,问题依旧存在。即:多线程并发的情况下,上述代码,大家都去查询,查询的count都是0,然后都往下走,去创建订单了,这就又出现多线程并发问题

注意: 这里不能用乐观锁了,乐观锁是在更新数据的时候用的,而我们这块的代码是新增,只能用悲观锁了

而锁的对象是不应该是this整个对象,而应该是一人一锁,即:是用户ID

  1. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  2. @Resource
  3. private ISeckillVoucherService seckillVoucherService;
  4. @Resource
  5. private RedisIdWorker redisIdWorker;
  6. @Override
  7. public Result seckillVoucher(Long voucherId) {
  8. /*
  9. 1、判断是否开始
  10. 2、判断是否结束
  11. 3、判断是否有库存
  12. 4、下单
  13. 5、减库存
  14. 6、返回
  15. */
  16. SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
  17. if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) {
  18. return Result.fail("活动尚未开始");
  19. }
  20. if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) {
  21. return Result.fail("活动已结束");
  22. }
  23. Integer stock = seckillVoucher.getStock();
  24. if (stock<1) {
  25. return Result.fail("库存不足");
  26. }
  27. /**
  28. * 每个人,即:每个用户自己有独立的一把锁,
  29. * 但是,要知道每次请求来的时候,我们创建的这个userId都是一个全新的对象,因此对象变了,锁也就没有意思了。
  30. * 要求值是一样的,所以我们toString,转成字符串,锁的是值
  31. * 而toString的底层代码里,也是return new String(buf, true); new了一个字符串
  32. * 所以每调一次toString,也是一个全新字符串对象,锁对象也还是会变的,
  33. * 每次来,虽然都是1011,即使转成字符串也是一个全新的对象,还是不可以,所以我们调用
  34. * 调用toString.intern() 方法,
  35. * 简单理解为去常量池里查,如果字符串常量池里有一个equals比较为true的,那就返回池子
  36. * 中的字符串,不会new新的字符串了,这样锁的就是同一个用户了,而不同的用户就不会被锁定,这样性能就提高了
  37. * 但是,有个问题,
  38. * 我们开启事务开始执行,执行之后,先释放锁,才会提交事务,
  39. * 而事务是有spring管理的
  40. * 就是这个函数方法执行完后,由spring去做提交,而这个锁在 synchronized{} 大括号结束后已经释放了,
  41. * 锁释放了,就意味着其他线程可以进来了,而此时事务尚未提交,那有其他线程进来查询操作,我们新增的这个订单可能还没有写入数据库
  42. * 这个时候别的线程去查询可能依然不存在,存在并发安全问题,因此在里边锁的情况,锁的范围就有点小了,
  43. * 应该是事务提交之后我们再去释放锁,我们应该把整个方法锁起来
  44. *
  45. * 事务失效:非public 目标对象
  46. * 我们的事务是在另一个方法开启的,而不是在调用它的方法开启的,这就会导致spring管理的事务失效
  47. * 在方法里调用别的方法,会导致事务失效,即:目标对象,而不是代理对象了
  48. *
  49. * 这里需要去拿到事务的代理对象才可以
  50. * 借助一个API
  51. *
  52. */
  53. // return createVoucherOrder(voucherId);
  54. Long userId = UserHolder.getUser().getId();
  55. synchronized (userId.toString().intern()) {
  56. /**
  57. * 借助一个API 通过这个方法去拿到当前对象的代理对象 我们称为 普绕SEI,
  58. * 而 当前代理对象就是他的service - > IVoucherOrderService
  59. */
  60. // 这样我们就拿到了当前代理对象了
  61. // 获取代理对象,和事务有关的代理对象,然后再去调用方法函数就没有问题了
  62. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  63. /*
  64. 用代理对象去调用该函数,而不是用this调用,这样的话这个函数就会被spring管理了
  65. 因为proxy 这个对象是由spring创建的,所以proxy.createVoucherOrder2(voucherId, userId);
  66. 他是带有事务的这样一个函数
  67. 函数不存在的原因是因为IVoucherOrderService接口不存在这个函数,我们创建就好了
  68. 我们是在实现类里做的,我们创建一下就好了,
  69. IVoucherOrderService接口有了,我们才能基于接口去做调用
  70. 现在我们的事务才能生效
  71. 这么做的话还需要做两件事
  72. 1、新添加依赖,aspectj包下的aspectjweaver 这样一依赖,(动态代理的模式)
  73. 2、启动类添加一个注解:去暴露这个代理对象:
  74. @EnableAspectJAutoProxy(exposeProxy = true) 默认值是false
  75. 将默认值改为truefalse是不会暴露的,不暴露的去获取是获取不到的,
  76. 一但暴露设置好了,我们这样就可以拿到代理对象了
  77. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  78. */
  79. return proxy.createVoucherOrder2(voucherId, userId);
  80. }
  81. }
  82. @Transactional
  83. public Result createVoucherOrder1(Long voucherId, Long userId) {
  84. /**
  85. * 每个人,即:每个用户自己有独立的一把锁,
  86. * 但是,要知道每次请求来的时候,我们创建的这个userId都是一个全新的对象,因此对象变了,锁也就没有意思了。
  87. * 要求值是一样的,所以我们toString,转成字符串,锁的是值
  88. * 而toString的底层代码里,也是return new String(buf, true); new了一个字符串
  89. * 所以每调一次toString,也是一个全新字符串对象,锁对象也还是会变的,
  90. * 每次来,虽然都是1011,即使转成字符串也是一个全新的对象,还是不可以,所以我们调用
  91. * 调用toString.intern() 方法,
  92. * 简单理解为去常量池里查,如果字符串常量池里有一个equals比较为true的,那就返回池子
  93. * 中的字符串,不会new新的字符串了,这样锁的就是同一个用户了,而不同的用户就不会被锁定,这样性能就提高了
  94. * 但是,有个问题,
  95. * 我们开启事务开始执行,执行之后,先释放锁,才会提交事务,
  96. * 而事务是有spring管理的
  97. * 就是这个函数方法执行完后,由spring去做提交,而这个锁在 synchronized{} 大括号结束后已经释放了,
  98. * 锁释放了,就意味着其他线程可以进来了,而此时事务尚未提交,那有其他线程进来查询操作,我们新增的这个订单可能还没有写入数据库
  99. * 这个时候别的线程去查询可能依然不存在,存在并发安全问题,因此在里边锁的情况,锁的范围就有点小了,
  100. * 应该是事务提交之后我们再去释放锁,我们应该把整个方法锁起来
  101. * 即:我们只有 先获取锁-> 提交事务 -> 再释放锁 这样一个操作,才能保证线程安全问题
  102. */
  103. /**
  104. * 新添加一人一单
  105. */
  106. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  107. if (count>0) {
  108. return Result.fail("用户已经买过了"+count+"次!");
  109. }
  110. /**
  111. * 以下的就继续走
  112. */
  113. boolean isFlag = seckillVoucherService.update().
  114. setSql("stock = stock-1").
  115. eq("voucher_id", voucherId)
  116. .gt("stock", 0)
  117. .update();
  118. if (!isFlag){
  119. return Result.fail("库存不足");
  120. }
  121. VoucherOrder voucherOrder = new VoucherOrder();
  122. long orderId = redisIdWorker.nextId("order");
  123. voucherOrder.setId(orderId);
  124. voucherOrder.setUserId(userId);
  125. voucherOrder.setVoucherId(voucherId);
  126. save(voucherOrder);
  127. return Result.ok(orderId);
  128. }
  129. @Transactional
  130. public Result createVoucherOrder2(Long voucherId, Long userId) {
  131. synchronized (userId.toString().intern()) {
  132. /**
  133. * 新添加一人一单
  134. */
  135. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  136. if (count>0) {
  137. return Result.fail("用户已经买过了"+count+"次!");
  138. }
  139. /**
  140. * 以下的就继续走
  141. */
  142. boolean isFlag = seckillVoucherService.update().
  143. setSql("stock = stock-1").
  144. eq("voucher_id", voucherId)
  145. .gt("stock", 0)
  146. .update();
  147. if (!isFlag){
  148. return Result.fail("库存不足");
  149. }
  150. VoucherOrder voucherOrder = new VoucherOrder();
  151. long orderId = redisIdWorker.nextId("order");
  152. voucherOrder.setId(orderId);
  153. voucherOrder.setUserId(userId);
  154. voucherOrder.setVoucherId(voucherId);
  155. save(voucherOrder);
  156. return Result.ok(orderId);
  157. }
  158. }

事务失效:

1、方法内调方法,非代理方法

2、异常,只有运行时异常 和 error可以

3、Spring 事务失效的 8 大场景,看看你都遇到过几个?_oh LAN的博客-CSDN博客_spring事务失效的场景

集群模式下加锁这么搞就不可以了

一人一单的并发安全问题

  • 通过加锁可以解决在淡季情况的一人一单安全问题,但是在集群模式下不行了,虽然加了锁,但是锁不住的

service下,ctrl+D 添加一个启动类,模拟集群模式下并发访问,

集群模式下,不同的JVM锁监视器是不同对,锁对象自然也就不同了,即使是同一个值也不行

postMan两个一样的请求 http://localhost:8080/api/voucher/list/10

结果都进到锁里了,这显然是不对的

IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  • 当我们做集群的时候,起一个服务,就相当于起了一个全新tomact,就意味起了一个全新的JVM,他们各自有自己堆栈方法区,当然也有自己常量池子
  • 也就说,我们一个锁监视器在当前JVM内部可以监视这些线程,实现互斥
  • 但是如果有多个JVM,就会有多个锁监视器,即:每一个JVM都会有一个线程是成功的
  • 如果将来集群实现10台,也就意味着并行的至少有10个线程是同时在运行的,那就又一次出现了线程安全问题
  • so
    • 在集群模式下,或分布式的系统下,有多个JVM的存在,每个JVM内部都有自己的锁,导致每个锁都会有一个线程获取,于是就出现了并行运行,就会出现线程安全问题

如何让多个JVM在集群或分布式下实现同一把锁?那就是分布式锁!

五、分布式锁

解决办法,由JVM内部锁监视器,变为同有的外部锁监视器,即:让多进程可见同一个监视器对象

什么是分布式锁:?

  • 满足分布式系统或集群模式下多进程可见并且互斥的锁
  • 必须保证三高

 

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种

redis、mysql、zookeeper

如何实现互斥:

  • mysql:mysql具备事务机制,在执行写操作的时候,mysql会自动分配一个互斥的锁,多事务之间执行就是互斥的,按照这么一个原理来操作,在我们的代码业务执行前,可以先去mysql里申请一个互斥锁,然后去执行业务,当业务执行完了以后,我们去提交事务,锁就释放了,如果抛出异常会自动触发回滚,锁也会释放,互斥效果,和锁的释放都可以实现
  • redis:是利用setnx这样的互斥命令,同一个key只有数据不存在时才能set成功,释放的话,只需要将key删了,就释放了
  • zookeeper:获取锁利用节点的唯一性和有序性实现互斥,(大部分是用有序性去实现互斥),获取最小节点去当作锁,以后释放锁的时候,只需要将最小节点删除即可

高可用:

  • mysql:依赖于mysql本身的可用性,mysql是支持主从模式的,可用性是不错的
  • redis:不仅支持主从,还支持集群模式,可用性有保障
  • zookeeper:支持集群,性能好

高性能:

  • mysql受限于它本身的性能,和redis比性能一般
  • redis性能远远高于mysql
  • zookeeper:集群强调 强的一致性,同步的话需要一定的是时间,性能消耗的时间较长,比redis差一些

安全性:

  • mysql:一旦出现异常问题,锁可以及时释放的,如果断开连接,锁会自动释放的,数据也会回滚
  • redis:setnx执行完了,一旦出现故障,是无法保障锁的释放的,没人执行删除key的动作,锁就无法释放,其他人也拿不到锁,就会出现死锁,是有问题的,我们需要自己取维护,利用key的过期机制,即:加一个过期时间,超时时间去解决安全性问题,如果过长,等待时间过多,如果太短,那我的业务还没执行完,就释放了,还需要去完善
  • cookeeper:比较好,它创建的节点一般是临时节点,一旦出现故障,临时宕机等,断开连接以后,节点会自动释放,所以锁就释放了,与mysql类似,都比redis要好

可用性上和性能来讲 redis更好。

安全性上zookeeper更好。

基于Redis的分布式锁

实现分布式锁需要实现的两个方法:

1、获取锁 (jdk获取锁又两种机制,一种是阻塞等待,等待有人释放锁为止,另一种是阻塞结束返回 一个结果,不等待。阻塞对CPU浪费,实现麻烦。这里我们利用非阻塞的

  • 互斥:确保只有一个线程获取锁,利用setnx操作
  • 添加超时释放
    • 不能太短,业务没执行完就释放,10S;
      • 如果执行setnx成功了, 就那次cccccccccccccccccccccccccccccccf6rtg8expire就宕机了呢?依然没法释放
      • 必须保证setnx 和 expire 这两者要么都成功,要么都失败,要具备原子性
        • set lock thread1 nx ex 10   换个命令具备原子性,ex:秒,(两个命令变成一个命令)

2、释放锁

  • 手动释放:del key
  • 超时释放:如果宕机了,就完蛋了,死锁,可以添加一个超时释放 expire
    • 必须保证setnx 和 expire 这两者要么都成功,要么都失败,要具备原子性
      • set lock thread1 nx ex 10   换个命令具备原子性,ex:秒,(两个命令变成一个命令)

案例:

基于Redis实现分布式锁初级版本

需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能

尝试获取锁:因为我们采用非阻塞的方法,不会用去不断重试,也不会阻塞

 实现类:利用构造方法去获取参数

  1. public class SimpleRedisLock implements ILock {
  2. private StringRedisTemplate stringRedisTemplate;
  3. private String name; // lockKey
  4. private static final String PRE_KEY = "lock:"; // lockKey前缀
  5. public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
  6. this.stringRedisTemplate = stringRedisTemplate;
  7. this.name = name;
  8. }
  9. @Override
  10. public boolean tryLock(long timeoutSec) {
  11. // 这里我们用线程的id 作为value
  12. long threadId = Thread.currentThread().getId();
  13. // 利用setnx ex:timeoutSec 超时释放时间 来实现互斥
  14. Boolean isFlag = stringRedisTemplate.opsForValue().setIfAbsent(PRE_KEY + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
  15. //这里isFlag接收的是包装类,而返回类型是非包装,防止null,报的空指针,这里用个True
  16. // truetrue falsenull 都返回false
  17. return Boolean.TRUE.equals(isFlag);
  18. }
  19. @Override
  20. public void unLock() {
  21. // 删除key 释放锁
  22. stringRedisTemplate.delete(PRE_KEY + name);
  23. }
  24. }

修改代码锁的逻辑,VoucherOrderServiceImpl类

由synchronized单体锁,变为Redis的setnx锁实现分布式锁

  1. Long userId = UserHolder.getUser().getId();
  2. // 单体锁
  3. /*synchronized (userId.toString().intern()) {
  4. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  5. return proxy.createOrder(voucherId, userId);
  6. }*/
  7. // redis分布式锁,实现分布式或集群下的多线程一人一单 用userId
  8. SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:"+userId);
  9. boolean lock = simpleRedisLock.tryLock(5);
  10. // false 要么失败,要么重试,一人一单这里抢购显然是用失败
  11. if (!lock) {
  12. return Result.fail("不能重复抢购");
  13. }
  14. // 释放锁在 finally
  15. try {
  16. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  17. return proxy.createOrder(voucherId, userId);
  18. } finally {
  19. // 释放锁
  20. simpleRedisLock.unLock();
  21. }

大多数情况下setnx锁都能实现互斥,但是在极端情况下,依然会存在问题!

比如线程1拿到锁后,业务进入阻塞状态中

  • 第一种情况:是业务执行完释放
  • 第二种情况:线程1是阻塞时间超过了我们的ex时间,被释放了,此时线程2来了,拿到锁了,又开始执行业务,此时,假设线程1,执行完业务了,锁被释放了(直接释放了线程2的锁)线程1把别人的锁给释放了,此时线程2还在傻乎乎的执行自己的业务呢,不知道自己的锁已经被线程1偷偷的给释放了(真坏呀),此时线程3来了,它趁虚而入,也去获取锁,成功了,然后又开始执行业务,然后线程2执行完,又把线程3的锁给释放了,
  • 此时此刻都有两个线程拿到了锁,在执行业务,又出现了线程并发安全问题

 总结来说:

  • 锁被超时释放
  • 释放锁,释放非自己的锁(删了别人的锁)
  • 要想解决这个问题,在释放锁的时候,得去判断是否是自己的锁

 

 改进Redis的分布式锁满足

  • 在获取锁时存入线程标识,(可以用UUID标识+线程ID)之前我们用的是线程ID,他是一个递增的数字,在JVM内部,每创建一个线程,都会递增,如果是集群模式下,会有多个JVM,每个JVM都会维护这样一个递增的数字,两个JVM很有可能出现线程冲突的,要区分不同的JVM
    • 二者结合就能确保不同线程标识不一样,相同线程标识一定一样
  • 在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致
    • 如果一致则释放锁
    • 如果不一致则不释放锁,什么都不用做

修改代码SimpleRedisLock

1、添加UUID,每个tomact的UUID是不一样的,一个服务是一样的,静态static

  1. // 用hutool的 true 是把UUID 的 - 去掉,然后再拼接一个中划线-来区分线程ID
  2. private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";

2、添加标识

  1. @Override
  2. public boolean tryLock(long timeoutSec) {
  3. // 拼接
  4. System.out.println(ID_PREFIX);
  5. String threadId = ID_PREFIX + Thread.currentThread().getId();
  6. // 改造 直接把拼接的作为线程标识存进去就好了
  7. Boolean isFlag = stringRedisTemplate.opsForValue().setIfAbsent(PRE_KEY + name, threadId, timeoutSec, TimeUnit.SECONDS);
  8. //这里isFlag接收的是包装类,而返回类型是非包装,防止null,报的空指针,这里用个True
  9. // truetrue falsenull 都返回false
  10. return Boolean.TRUE.equals(isFlag);
  11. }

3、释放锁,虽然是静态的,但是因为是集群模式下,每个服务的UUID是不同的

  1. @Override
  2. public void unLock() {
  3. // 改造
  4. // 获取标识
  5. String threadId = ID_PREFIX + Thread.currentThread().getId();
  6. // 获取线程的value
  7. String id = stringRedisTemplate.opsForValue().get(PRE_KEY + name);
  8. /**
  9. * 如果标识 与 id 一致,则释放做,反之不管
  10. * 虽然uuid是静态的全局就一份,但是我们是在集群模式下,每个服务器的静态都不一样
  11. * 比如tomact1 的UUID可能永远都是123
  12. * 而tomact2的UUID可能永远都是456,这样就很明显的区分了标识
  13. */
  14. if (threadId.equals(id)) {
  15. stringRedisTemplate.delete(PRE_KEY + name);
  16. }

如果线程1超时被清除了,当线程二进来了并且加锁后,此时线程1执行完任务,线程一要释放锁,然后线程一发现,这是啥?这不是我的锁~,线程1就什么都不做了就

线程2未超时,执行完发下自己的标识与value相等,是自己的锁,则直接释放

 以上就是解决分布式锁误删问题,让我们的分布式锁更加的健壮了!

然后我们现在的并不是十全十美,完美无缺!

在某些极端的情况下依然会有问题

比如线程1获取锁,执行完业务,获取锁的标识,发现一致,然后要去释放锁,然而,就在此时,要去释放锁的中间,发生了阻塞(比如GC GC执行的时候,其实是会阻塞我们的所有代码的),一但发生阻塞了,那我就没有去释放,如果阻塞的时间足够长,就很有可能触发我们的超时时间,

搞笑的来了,此时线程2进来了,开始执行任务,突然阻塞结束了,线程一开始去释放,此时判断已经执行过了,它认为锁还是自己的,直接执行释放锁了,就把线程二的锁给释放掉了,又执行了误删

此时,又来了个线程3,获取锁成功,执行自己的任务,又出现了并发安全问题

原因:判断锁标识,和释放锁 是两个动作,这两个动作之间产生了阻塞,最后出现了问题

 解决:判断锁标识 和 释放锁 的动作,必须保证原子性,即:一起执行,不能出现间隔

首先Redis也是有事务的,但是,Redis能够保证事务的原子性,但是不能保障一致性

Redis的Lua脚本:

Redis提供了Lua脚本功能,在一个脚本中表写多条Redis命令,(Redis就会一次性去执行他们)确保这多条命令执行时的原子性。Lua是一种编程语言

2、Lua脚本多参数实现:Lua语言的下表是从1开始的

 删除成功return 1,失败renturn 0,这样一个Lua脚本就写完了,local代表声明一个参数,if的大括号没有了,用then表示,结束用end表示

简化后的脚本,返回值为1,或0 

  1. if (redis.call('get', keys[1]) == argv[1]) then
  2. return redis.call('del', keys[1])
  3. end
  4. return 0

再次改进Redis的分布式锁

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedsiTimeplate调用Lua脚本的API如下

项目里定义一个Lua->unLock.lua 脚本,不要写死

  1. -- 比较线程标识与锁中的标识是否一致
  2. if(redis.call('get', KEYS[1]) == ARGV[1]) then
  3. -- 释放锁
  4. return redis.call('del', KEYS[1])
  5. end
  6. return 0
  1. /*
  2. 初始化Lua脚本,因为用的静态,只要这个类一加载,初始化就完成了,这样就不用每次释放锁去加载了,性能就提高了
  3. RedisScript是个接口,里边有个实现是 DefaultRedisScript,泛型是个返回值类型,我们这里用long 本来就是 10
  4. */
  5. private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
  6. static {
  7. /*
  8. 括号会接收一个脚本,ctrl+q 可以查看构造方法接收类型
  9. 我们可以看出来接收的是一个字符串,就是把文件里的内容当做字符串,这种模式就硬编码,不建议这么做
  10. 我们就放文件里,所以不去管他,传空
  11. 我们调用方法去设置脚本位置
  12. */
  13. UNLOCK_SCRIPT = new DefaultRedisScript<>();
  14. /*
  15. 接收参数 Resource,我们用spring提供的一个ClassPathResource,
  16. 就是在classpath下的一些资源
  17. 我们创建的unLock.lua 在resources文件下,就在classpath下,直接指定
  18. */
  19. UNLOCK_SCRIPT.setLocation(new ClassPathResource("unLock.lua"));
  20. /*
  21. * 还可以配置一下返回值类型
  22. */
  23. UNLOCK_SCRIPT.setResultType(Long.class);
  24. }

修改SimpleRedisKey 的释放锁方法,脚本有Redis帮我们执行,保证原子性(判断和释放的原子性)

  1. @Override
  2. public void unLock() {
  3. /*
  4. 調用Lua脚本,写到一个文件中 unLock.lua
  5. 参数:RedisScript 得去加载我们的Lua文件,不应该在释放锁的时候去读取
  6. 这里设置成静态的,每次执行都去读文件,产生IO流,效果不好,性能差
  7. 注意,key是传的集合,这里用集合的工具类,单元素的集合
  8. 这里不用管返回值了,释放锁嘛,成功就成功了,如果没有成功,就被别人释放了,或超时释放
  9. 这段脚本是由Redis帮我们执行的,保证原子性
  10. */
  11. stringRedisTemplate.execute(UNLOCK_SCRIPT
  12. ,Collections.singletonList(PRE_KEY + name)
  13. ,ID_PRE + Thread.currentThread().getId());
  14. }

总结:到此为止,已经实现了一个生产可用的基于Redis的分布式锁

但是:基于setnx实现的分布式锁存在下面的问题

  • 1、不可重入同一个线程无法多次获取同一把锁 (像在JDK里就有一个可重入锁的说法,,就是同一个线程可以多次获取同一把锁,比如说方法A去调用方法B,在方法A中先去获取锁,然后执行业务去调B,而B里又要获取同一把锁,显然是无法获取的,此时方法B就会去等待锁的释放,而锁又无法释放,因为方法A还没有执行完,还在调用B,这种情况下,如果锁是不可重入的,那就会产生死锁了)但是在这样的场景下哈,应该要求锁是可重入的 
  • 2、不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 3、超时释放锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患->虽然之前利用判断锁标识,和Lua脚本去解决了因为超时释放而导致误删问题, 但是这个超时释放也是有可能发生的,虽然不会导致误删,但是也会导致其他隐患问题   
  • 4、主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁实现
    • 主从模式,也可简单理解为读写分离模式,当我执行写操作,执行主节点,当我们执行读操作,就去执行从节点。主节点需要把数据同步给从节点,保证主从的数据是一致的,这样就可以在多个节点上完成读的操作,提高整个服务的并发能力和高可用性,而且假如主宕机了,还可以从从节点选出一个新的主,这样可用性就比较好了,这就是主从的目的
    • 但是:主从同步是有延迟的,在极端情况下可能发生这样一个情况
      • 有一个线程在主节点获取了锁,因为获取锁是一个set操作,它是一个写操作,假如说这个写操作在主节点完成了,尚未同步给从节点的时候,可能是延迟什么的,突然主节点就宕机了,此时会选一个新的从作为主,而这个从节点没有完成同步,他是没有锁的这个标识的,也就是说这个时候其他线程可以趁虚而入,去拿到锁,这个时候就等于有多个线程拿到锁,在极端情况下就可能出现安全问题,虽然概率低(毫秒级别)

大多数场景下,我们之前实现的锁都够用了,以上4个问题,大多数情况下,也基本不会去管,如果说对锁的要求非常高,那就必须解决这4个问题

我们可以利用一个框架去解决这4个功能:Redisson

Redisson:

  • Redisson是一个在Redis的基础上实现的Java驻内存数据网格(说人话!好吧..."它是一个在 Redis基础上实现的一个分布式工具的集合")。它不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

嗯。。。我们以后使用分布式锁的时候,可以就使用这个开源的框架,Redisson什么都有

1、Rediss入门

  • 引入依赖,redisson 是org的,
  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.13.6</version>
  5. </dependency>
  • config包下新建 配置Rediss客户端 RedissonConfig
  • 事实上有两种配置,还可以用yaml文件和springBoot去整合使用,但是我们这里不推荐使用redisson-springBoot-start,他会替代spring官方提供的对于Redis的那套配置和实现
  • 建议使用分布式锁的时候,自己来配置下边的Redisson客户端,不要去和springBoot里边对Redis的配置 去混在一起

 注意:tryLock();第一个参数是获取锁的最大等待时间比如1秒,期间会重试,如果1秒后还没有或得锁,那就回返回false

 改造下单业务

Redisson的tryLock();有三种构造,是线程安全的

  • 同上三参数
  • 无参数:三个参数是有默认值的,默认:
    • -1:没有等待时间,获取不到返回false
    • -1:默认是30秒
    • 秒:时间单位
  1. @Configuration
  2. public class RedissonConfig {
  3. @Bean
  4. public RedissonClient redissonClient() {
  5. Config config = new Config();
  6. config.useSingleServer().setAddress("redis://localhost:6379");
  7. return Redisson.create(config);
  8. }
  9. }
  1. -- 注入上边工具类里写的配置Redissclient
  2. @Resource
  3. private RedissonClient redissonClient;

修改VoucherOrderServiceImpl下单的实现类,经测试,它是线程安全的

  1. /**
  2. * 这两个先注释,用Redisson的
  3. */
  4. //SimpleRedisKey lock = new SimpleRedisKey(stringRedisTemplate, "order:"+userId);
  5. //boolean islock = simpleRedisKey.tryLock(5L);
  6. //利用我们配置的Redisson的锁方法
  7. RLock lock = redissonClient.getLock("order:" + userId);
  8. // 这里用无参数的 tryLock() 方法
  9. boolean isLock = lock.tryLock();
  10. if (!isLock) {
  11. return Result.fail("不允许重复抢购");
  12. }
  13. try {
  14. IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
  15. return proxy.createOrder(voucherId, userId);
  16. } finally {
  17. lock.unlock();
  18. }

2、Redisson可以重入锁原理(黑马课程java并发)

简单理解:当我获取锁的时候去判断是否有人拿了,然后判断是不是我自己拿的,也就是说是不是同一个线程(是同一个线程,就是我自己拿的,不是一个线程就是别人的),如果是同一个线程,就也让他获取锁,里边会有个计数器(会累加),回去记录获取了几次,以后释放锁的时候会去减1。当所有的业务都执行完,重入次数一定会被减成0,所以每次释放锁的时候,除了要-1,还需要去判断次数是否已经为0了,如果为0了,就可以直接删除锁了,否则不可以直接删锁

我们设计的,就是既要记录线程标识,也要记录线程的次数,显然String类型就不合适了,可以利用hash类型

Redisson多方面重置有效期,给后续任务留够充足的时间;

代码太多了,太复杂了,我们一定要保证代码的原子性,用Lua脚本去执行

  获取锁的流程:

 释放锁的流程:

利用Redisson测试可重入锁Demo,可以去打断点,根据断点去看Redis客户端

tryLock底层源码:

  • 先去判断过期时间是否不为-1,没传的情况下,默认是30秒
  1. @Slf4j
  2. @SpringBootTest
  3. class RedissonTest {
  4. @Resource
  5. private RedissonClient redissonClient;
  6. private RLock lock;
  7. @BeforeEach
  8. void setUp() {
  9. lock = redissonClient.getLock("order");
  10. }
  11. @Test
  12. void method1() throws InterruptedException {
  13. // 尝试获取锁
  14. boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
  15. if (!isLock) {
  16. log.error("获取锁失败 .... 1");
  17. return;
  18. }
  19. try {
  20. log.info("获取锁成功 .... 1");
  21. method2();
  22. log.info("开始执行业务 ... 1");
  23. } finally {
  24. log.warn("准备释放锁 .... 1");
  25. lock.unlock();
  26. }
  27. }
  28. void method2() {
  29. // 尝试获取锁
  30. boolean isLock = lock.tryLock();
  31. if (!isLock) {
  32. log.error("获取锁失败 .... 2");
  33. return;
  34. }
  35. try {
  36. log.info("获取锁成功 .... 2");
  37. log.info("开始执行业务 ... 2");
  38. } finally {
  39. log.warn("准备释放锁 .... 2");
  40. lock.unlock();
  41. }
  42. }
  43. }

他里边有一段逻辑,就是Lua脚本,redis是通过一个字符串的形式,将脚本直接写死了,我们是直接写一个文件里了

 unlock();方法同理,底层也是一段Lua脚本

解决锁的重试问题:

底层依然用的上面的Lua脚本 成功返回nil 就类似于 java 的null,失败返回的是锁的剩余有效期,pttl 是毫秒,注意返回的是一个future函数,该函数是一个异步函数(Async),返回没返回,下边的代码还不知道呢,

 返回毫秒,下边这个get是一个阻塞等待,等待返回有效期,成功返回null,失败返回剩余时间

如果失败了,会走重试,那么也不是立刻就去重试,如下

  1. return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  2. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  3. return nil;
  4. end;
  5. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  6. if (counter > 0) then
  7. redis.call('pexpire', KEYS[1], ARGV[2]);
  8. return 0;
  9. else
  10. redis.call('del', KEYS[1]);
  11. redis.call('publish', KEYS[2], ARGV[1]);
  12. return 1;
  13. end;
  14. return nil;",
  15. Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));

!=null,上边 释放锁的脚本里,有一行叫publish命令,发布一个消息通知,而subscribe叫做订阅消息,那边发布了,这边就知道了,然后才会再次尝试重新执行(他还有一个等待时间,如果说等待时间超过了锁的剩余最大等待时间结束,那就不用等待了,返回false,那么非false返回true,)

  1. -- 源码
  2. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  3. -- 最大等待时间
  4. long time = unit.toMillis(waitTime);
  5. -- 当前时间
  6. long current = System.currentTimeMillis();
  7. long threadId = Thread.currentThread().getId();
  8. -- 获取锁的剩余时间
  9. Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  10. -- 等于null 说明成功了
  11. if (ttl == null) {
  12. return true;
  13. } else {
  14. -- 否则 计算出最大剩余等待时间
  15. time -= System.currentTimeMillis() - current;
  16. -- 小于等于0 失败了
  17. if (time <= 0L) {
  18. this.acquireFailed(waitTime, unit, threadId);
  19. return false;
  20. } else {
  21. -- 再次计算时间
  22. current = System.currentTimeMillis();
  23. -- 订阅消息
  24. RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
  25. -- 等待时间time,就是锁的剩余最大等待时间,当RFuture在,指定时间内完成返回true
  26. -- 如果等待这个时间结束,还没有收到释放锁的通知就没必要等了,就会返回false,非就是true,就会进if里边
  27. if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
  28. if (!subscribeFuture.cancel(false)) {
  29. subscribeFuture.onComplete((res, e) -> {
  30. if (e == null) {
  31. this.unsubscribe(subscribeFuture, threadId);
  32. }
  33. });
  34. }
  35. // 然后告诉我们获取锁 失败了
  36. this.acquireFailed(waitTime, unit, threadId);
  37. return false;
  38. }

 else 相反,在最大等待时间内,收到了通知

  1. try {
  2. -- 再次获取最大等待时间,注意,是减去上边代码 current 等待的消耗时间
  3. -- time = 剩余时间-等待的时间,得到的是这次的最大等待时间
  4. time -= System.currentTimeMillis() - current;
  5. -- 如果也小于等于0,那就也return false
  6. if (time <= 0L) {
  7. this.acquireFailed(waitTime, unit, threadId);
  8. boolean var20 = false;
  9. return var20;
  10. } else {
  11. -- 反之时间依然有剩余, 那么我们就终于可以去重试了
  12. boolean var16;
  13. do {
  14. -- 得到当前时间
  15. long currentTime = System.currentTimeMillis();
  16. -- 终于开始第一次重试获取锁了
  17. ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
  18. -- 等于null 获取成功
  19. if (ttl == null) {
  20. var16 = true;
  21. return var16;
  22. }
  23. -- 否则 继续判断是否超时
  24. time -= System.currentTimeMillis() - currentTime;
  25. if (time <= 0L) {
  26. this.acquireFailed(waitTime, unit, threadId);
  27. var16 = false;
  28. return var16;
  29. }
  30. -- 如果剩余时间还有,那就继续尝试
  31. currentTime = System.currentTimeMillis();
  32. -- 不同的是这里获取的是getLatch 信号量,释放锁的会释放一个信号量,依然有最大等待时间,两种情况,
  33. -- 第一,当ttl<剩余等待时间时,就会等ttl时间,时间到了会立即释放
  34. if (ttl >= 0L && ttl < time) {
  35. ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  36. -- 否则 就等time时间,time一旦到期,你还没释放,我也就没必要等了
  37. } else {
  38. ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  39. }
  40. -- 再看下时间
  41. time -= System.currentTimeMillis() - currentTime;
  42. -- 如果时间依旧很充足,那就不停地尝试,等待,尝试等待,不停地循环
  43. } while(time > 0L);
  44. this.acquireFailed(waitTime, unit, threadId);
  45. var16 = false;
  46. return var16;
  47. }
  48. } finally {
  49. this.unsubscribe(subscribeFuture, threadId);
  50. }

这个巧妙的机制就是他利用了消息订阅,和信号量机制,导致他不是无休止的、盲目的等待或重试机制,(释放了,尝试,释放了,尝试),这样的话,我们的Redisson锁就说明它是支持尝试机制的,只要我们给了第一个参数tryLock(long waitTime, TimeUtil unit);来解决锁的重试

解决锁的超时问题

必须确保是业务执行完 锁的释放,而不是因为阻塞了而释放,

继续根据Redisson 的tryLock();空参数方法的 tryAcquire方法

  1. public RFuture<Boolean> tryLockAsync() {
  2. // 来了一个线程
  3. return this.tryLockAsync(Thread.currentThread().getId());
  4. }
  1. private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. if (leaseTime != -1L) {
  3. return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  4. } else {
  5. // 释放时间=-1 ,那么看门狗的默认时间=30
  6. RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  7. // 当ttlRemainingFuture 完成返回以后,传进来剩余有效期 和 异常
  8. // 异常!=null 是抛异常了,是有问题了,就什么都不做
  9. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  10. if (e == null) {
  11. if (ttlRemaining) {
  12. // 自动调度 自动更新续约的一个功能
  13. this.scheduleExpirationRenewal(threadId);
  14. }
  15. }
  16. });
  17. return ttlRemainingFuture;
  18. }
  19. }
  1. private void scheduleExpirationRenewal(long threadId) {
  2. RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
  3. // 这里EXPIRATION_RENEWAL_MAP 是一个currentHashMap,是一个静态的,
  4. // 第一个参数就可以理解为key 是个字符串,可以理解为当前锁的名称,值就是entry
  5. // 是静态的,他的实例类,都可以看到map,调用的话,就都是不同的,一个锁对应一个entry
  6. // putIfAbsent,如果不存在,才往里面放,放进去的就一个全新entry,返回值就是null
  7. // 如果说不是一次来,重入的,第二次或第三次,那就不创建,返回值是第一次的entry,确保
  8. // 不管这把锁重入几次,将来拿到的都是同一个entry
  9. // 这个map的作用就是保证同一把锁,拿到的永远是同一个entry,
  10. RedissonLock.ExpirationEntry oldEntry =
  11. (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
  12. // 其实,不同的线程,是不可能拿到同一把锁的, 所以老的 的话,一定是同一个线程多次来
  13. // 其实就是一种重入,把线程ID加进去
  14. if (oldEntry != null) {
  15. // 不管是旧的,来了一个线程,就把这个线程加进来
  16. oldEntry.addThreadId(threadId);
  17. } else {
  18. // 还是新的,都会加进来
  19. entry.addThreadId(threadId);
  20. // 如果是第一次来,还会更新有效期
  21. this.renewExpiration();
  22. }
  23. }
  24. // 源码 就是同一个线程就是做了+1操作
  25. public synchronized void addThreadId(long threadId) {
  26. Integer counter = (Integer)this.threadIds.get(threadId);
  27. if (counter == null) {
  28. counter = 1;
  29. } else {
  30. counter = counter + 1;
  31. }
  32. this.threadIds.put(threadId, counter);
  33. }

如果是新的,更新有效期

  1. // 更新有效期
  2. private void renewExpiration() {
  3. // 首先从map里得到该entry
  4. RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
  5. if (ee != null) {
  6. // timeout 超时任务,或定时任务
  7. // .newTimeout(),有两个参数,1、任务本身,2、是延迟,3、时间单位,也就是说,这个任务会在时间单位到期以后才会执行,所以是一个延时的任务
  8. /* 这个延时任务过了 this.internalLockLeaseTime / 3L 才会执行 内部锁释放时间/3
  9. 因为我们没有传,默认是-1,而获取了看门狗的时间30
  10. 30/3=10,也就是10秒之后,开始执行
  11. */
  12. Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  13. public void run(Timeout timeout) throws Exception {
  14. // 10秒后拿出entry
  15. RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
  16. if (ent != null) {
  17. // 从entry里取出线程ID
  18. Long threadId = ent.getFirstThreadId();
  19. if (threadId != null) {
  20. // 紧接着,开始刷新有效期,里边是一个Lua脚本,如下图所示
  21. // 做了两件事,1、判断锁是不是我拿的,2、尝试去更新有效期expire
  22. // 该行代码就是重置当前线程锁的有效期
  23. RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
  24. // 执行完了以后执行
  25. future.onComplete((res, e) -> {
  26. if (e != null) {
  27. RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
  28. } else {
  29. if (res) {
  30. // 递归,再调自己,10秒后又重置,总结来说就是永不过期
  31. RedissonLock.this.renewExpiration();
  32. }
  33. }
  34. });
  35. }
  36. }
  37. }
  38. }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
  39. // 最后把任务放到entry里了,所以这个entry封装的是两个东西,
  40. // 第一个:是当前线程的id
  41. // 第二个:就是这个定时的任务
  42. // 所以,上边放旧的 的时候,就不用执行刷新有效期了,
  43. ee.setTimeout(task);
  44. }
  45. }

 重置当前线程持有锁的有效期Lua

  1. // 重置有效期
  2. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  3. return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  4. "
  5. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  6. redis.call('pexpire', KEYS[1], ARGV[1]);
  7. return 1;
  8. end;
  9. return 0;
  10. "
  11. , Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
  12. }

 释放锁:unlock

  1. public void unlock() {
  2. try {
  3. // 异步执行unlock
  4. this.get(this.unlockAsync(Thread.currentThread().getId()));
  5. } catch (RedisException var2) {
  6. if (var2.getCause() instanceof IllegalMonitorStateException) {
  7. throw (IllegalMonitorStateException)var2.getCause();
  8. } else {
  9. throw var2;
  10. }
  11. }
  12. }
  13. public RFuture<Void> unlockAsync(long threadId) {
  14. RPromise<Void> result = new RedissonPromise();
  15. // 执行完后
  16. RFuture<Boolean> future = this.unlockInnerAsync(threadId);
  17. future.onComplete((opStatus, e) -> {
  18. // 执行取消任务
  19. this.cancelExpirationRenewal(threadId);
  20. if (e != null) {
  21. result.tryFailure(e);
  22. } else if (opStatus == null) {
  23. IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
  24. result.tryFailure(cause);
  25. } else {
  26. result.trySuccess((Object)null);
  27. }
  28. });
  29. return result;
  30. }
  31. -- 删除整个定时任务,删除后,锁的释放就完成了
  32. void cancelExpirationRenewal(Long threadId) {
  33. // 从map里获取 到 当前这把锁的任务
  34. RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
  35. if (task != null) {
  36. if (threadId != null) {
  37. // 删除 线程id
  38. task.removeThreadId(threadId);
  39. }
  40. if (threadId == null || task.hasNoThreads()) {
  41. // 取出 timeout任务
  42. Timeout timeout = task.getTimeout();
  43. if (timeout != null) {
  44. // 将任务删除
  45. timeout.cancel();
  46. }
  47. // 最后再把entry 取消掉即可
  48. EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
  49. }
  50. }
  51. }

获取锁流程:

释放锁的流程,右侧是释放锁,左侧流程图是获取锁

 

Redisson解决主从一致性问题

redisson分布式锁主从一致性问题

产生的原因:

  • 主从之间会做数据的同步,主节点会不断的把数据同步给从节点,确保主从之间数据是一致的,但是毕竟不是同一台机器,主从之间会有一定的延迟,数据的同步也会有一定的延迟,尽管时间很短

redisson是如何解决主从一致性问题呢的?

  • 不要主从了,所有的节点都变成独立的Redis节点,相互之间没有任何关系,没有主从,都可以去做读写,此时获取锁的方式就变了
  • 获取锁的方式:依次的向多个节点都去获取锁,都保存了锁的标识,才算获取锁成功。
    • 首先没有主从,不会有一致性问题
    • 真的有一个节点宕机了,别的节点依然存活着,锁依然有效
    • 可用性,随着节点的增多,越来越高
    • 而且再提高可用性,还可以添加主从同步,也不会有主从一致性问题
      • 假如有一个节点真的宕机了,刚好没有完成同步,然后从节点升级为主节点
      • 我们上边说的,只有所有节点都拿到锁成功,才算成功,有一个能拿到,另外两个拿不到,会失败的
      • 也就是说,如果发生宕机了,但是只要有一个节点存活着,其他线程就不可能拿到锁,不会出现锁失效问题
      • 这种方案在redis里面有个名词---联锁

 搭建三台redis节点,形成一个联锁

修改Redisson配置,例如:如下三个独立的RedissClient,利用这三个分别获取独立的锁,将三个独立的锁联在一起,变成联锁

  1. @Configuration
  2. public class RedissonConfig {
  3. @Bean
  4. public RedissonClient redissonClient() {
  5. Config config = new Config();
  6. config.useSingleServer().setAddress("redis://localhost:6379");
  7. return Redisson.create(config);
  8. }
  9. @Bean
  10. public RedissonClient redissonClient2() {
  11. Config config = new Config();
  12. config.useSingleServer().setAddress("redis://localhost:6380");
  13. return Redisson.create(config);
  14. }
  15. @Bean
  16. public RedissonClient redissonClient3() {
  17. Config config = new Config();
  18. config.useSingleServer().setAddress("redis://localhost:6390");
  19. return Redisson.create(config);
  20. }
  21. }

新建单元测试,创建联锁:

  1. @Slf4j
  2. @SpringBootTest
  3. class RedissonTest {
  4. @Resource
  5. private RedissonClient redissonClient;
  6. @Resource
  7. private RedissonClient redissonClient2;
  8. @Resource
  9. private RedissonClient redissonClient3;
  10. private RLock lock;
  11. @BeforeEach
  12. void setUp() {
  13. RLock lock = redissonClient.getLock("order");
  14. RLock lock2 = redissonClient2.getLock("order");
  15. RLock lock3 = redissonClient3.getLock("order");
  16. // 创建联锁 multiLock
  17. RLock multiLock = redissonClient.getMultiLock(lock, lock2, lock3);
  18. }

点进去,进去发现底层代码是new的,也就是说,我去调的时候都是new的,不光redissonClient调可以,redissonClient2 和redissonClient3 调都是一样的,底层都是重新new的,传进来的参数是一个可变数组,即使将来我自己new 都没有问题

  1. public RLock getMultiLock(RLock... locks) {
  2. return new RedissonMultiLock(locks);
  3. }

再跟进去,可以看出来是将可变参数转成了集合,放到了成员变量里,也就是说多个独立的锁,都放到集合locks里了,按照联锁的原理,以后是会把这个集合里的锁都去尝试获取一遍,都成功了,才算成功,别的代码都不需要动即可

  1. final List<RLock> locks = new ArrayList();
  2. public RedissonMultiLock(RLock... locks) {
  3. if (locks.length == 0) {
  4. throw new IllegalArgumentException("Lock objects are not defined");
  5. } else {
  6. this.locks.addAll(Arrays.asList(locks));
  7. }
  8. }

打断点可知,所谓的联锁,就是多个独立的锁,而每个独立的锁和之前的原理是一样的,包括获取 技术+1 -1,删除锁,原理都一样

联锁的tryLock:底层代码分析

  1. boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
  2. // 应为我们没传释放时间,默认是-1
  3. public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
  4. return this.tryLock(waitTime, -1L, unit);
  5. }
  1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  2. /*
  3. 这一段,我们只有传了过期时间才会走
  4. 对于释放时间的处理
  5. */
  6. long newLeaseTime = -1L;
  7. if (leaseTime != -1L) {
  8. // 如果是-1 说明只想获取一次,也就是说不重试,释放时间是多久那就是多久
  9. if (waitTime == -1L) {
  10. newLeaseTime = unit.toMillis(leaseTime);
  11. } else {
  12. // 说明锁获取失败后想要重试,就不会我们传进来的释放时间了,会把等待时间*2
  13. // 重试可能耗时比较久,万一重试时间小于等待时间,还没重试完呢,就释放了,就有问题了
  14. newLeaseTime = unit.toMillis(waitTime) * 2L;
  15. }
  16. }
  17. // 当前时间
  18. long time = System.currentTimeMillis();
  19. // 剩余时间,初始化-1
  20. long remainTime = -1L;
  21. if (waitTime != -1L) {
  22. // 如果传了等待时间,那就回做替换操作,也就是说remainTime就是等待时间
  23. remainTime = unit.toMillis(waitTime);
  24. }
  25. // 计算锁的等待时间,锁等待时间=剩余等待时间
  26. long lockWaitTime = this.calcLockWaitTime(remainTime);
  27. // 失败的锁的限制,是0
  28. int failedLocksLimit = this.failedLocksLimit();
  29. // 获取成功的锁集合,while 循环都执行完成功的,就拿到了所有的锁
  30. List<RLock> acquiredLocks = new ArrayList(this.locks.size());
  31. // 遍历我们的三个独立的锁,因为我们前边只创建了三个Redis节点
  32. ListIterator iterator = this.locks.listIterator();
  33. while(iterator.hasNext()) {
  34. // 先拿到锁
  35. RLock lock = (RLock)iterator.next();
  36. // 代表获取锁成功,或失败
  37. boolean lockAcquired;
  38. try {
  39. // tryLock为空参数的情况,只一次,不重试,尝试是否获取到锁
  40. if (waitTime == -1L && leaseTime == -1L) {
  41. lockAcquired = lock.tryLock();
  42. } else {
  43. // 非空参数的情况,带有重试的,尝试是否获取到锁
  44. long awaitTime = Math.min(lockWaitTime, remainTime);
  45. lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
  46. }
  47. } catch (RedisResponseTimeoutException var21) {
  48. this.unlockInner(Arrays.asList(lock));
  49. lockAcquired = false;
  50. } catch (Exception var22) {
  51. lockAcquired = false;
  52. }
  53. // 去判断获取锁有没有成功
  54. if (lockAcquired) {
  55. // 成功的话就把锁放到 已经成功的锁的集合里
  56. acquiredLocks.add(lock);
  57. // 获取锁失败,没有获取到锁
  58. } else {
  59. // 锁的总数 - 已经获取的锁的数量 == 0,5-2==3false
  60. if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
  61. // 已经获取的锁的数量=锁的总数,才会=0 ,才能跳出循环,只有把锁都拿到了才会结束
  62. break;
  63. }
  64. //0
  65. if (failedLocksLimit == 0) {
  66. this.unlockInner(acquiredLocks);
  67. // 如果是-1 证明不想做重试,那就一次失败,直接失败,返回false
  68. if (waitTime == -1L) {
  69. return false;
  70. }
  71. // 如果想重试
  72. failedLocksLimit = this.failedLocksLimit();
  73. // 先把已经拿到的锁清空,即使一把锁都没拿到
  74. acquiredLocks.clear();
  75. // 把迭代器往前迭代,就是把指针指向第一个,因为要重试,重头再来,从第一把锁开始
  76. while(iterator.hasPrevious()) {
  77. // 循环往前迭代,循环完最终指针就指向了第一个元素,然后结束,又开始新一轮for循环(这里是最外层的while循环),也就是说,要么把所有的锁都拿到,要么失败了重试,直到等待时间小于0,超时为止!,这就是整个获取锁的流程
  78. iterator.previous();
  79. }
  80. } else {
  81. --failedLocksLimit;
  82. }
  83. }
  84. // 判断剩余等待时间是否是-1,如果不是,说明剩余时间很充足
  85. if (remainTime != -1L) {
  86. // 得到现在剩余时间
  87. remainTime -= System.currentTimeMillis() - time;
  88. time = System.currentTimeMillis();
  89. // 如果剩余时间小于等于0,说明刚才获取锁,已经把等待时间给耗尽了,代表获取锁超时了,只能false失败,但是,在false 会先去把已经获取到的这些锁释放掉(集合),因为已经失败了,前边拿到的锁就不能再拿了,拿了别人就拿不到了,干脆就释放掉了
  90. if (remainTime <= 0L) {
  91. this.unlockInner(acquiredLocks);
  92. return false;
  93. }
  94. }
  95. // 如果时间还很充足,那就开始下一次循环了,正好while循环结束,开始下一次循环
  96. }
  97. // leaseTime!=-1 在结束前才会执行这段逻辑。锁的释放时间,我们之前是没有传的
  98. // leaseTime ==-1 不传就会触发看门狗机制,默认30,而且会自动去续期,所以不需要这里去处理
  99. // 如果传了就没了,需要自己指定锁的释放时间
  100. // 推荐释放时间不传,不要设置,用看门狗的就可以了
  101. if (leaseTime != -1L) {
  102. // 传了的逻辑
  103. List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
  104. Iterator var24 = acquiredLocks.iterator();
  105. // 遍历拿到的每一把锁
  106. while(var24.hasNext()) {
  107. RLock rLock = (RLock)var24.next();
  108. // 会执行expireAsync expire命令,设置有效期,也就是说会给没把所都重新设置一下有效期,因为我们获取锁的时候会有多个redis节点依次获取,第一把锁在获取之后,立即就开始倒计时了,而最后一把锁再获取后 会刚开始倒计时,也就是说,到这个循环的时候,第一把锁的剩余有效期,一定会比最后一把锁的剩余有效期的时间要短一些,这样就有可能出现一些问题,有些锁释放了,有些没有释放的情况,避免这个问题的发生,这行代码的意思就是,等所有锁都拿到了,给所有的锁都重新设置一下有效期,确保大家的有效期是一样的
  109. RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
  110. futures.add(future);
  111. }
  112. var24 = futures.iterator();
  113. while(var24.hasNext()) {
  114. RFuture<Boolean> rFuture = (RFuture)var24.next();
  115. rFuture.syncUninterruptibly();
  116. }
  117. }
  118. // 成功了 返回true
  119. return true;
  120. }

总结:

六、Redis优化秒杀

客户端---通过nginx---负载均衡到tomact---tomact内部大量的请求(串行执行)下图箭头这四步都会直接打到DB,而数据库本身的并发能力就差,而且还有写操作,避免分布式问题,我们还加了锁,所以相对来说整个业务的耗时就较长,虽然也是毫秒级别的

  • 可以多线程执行,查询的线程(判断秒杀资格);
  • 写操作的线程(开启独立线程执行)

利用Redis完成对秒杀库存的判断,和一人一单的判断

优惠券库存判断:利用String

  • key:就是优惠券id
  • value:就是库存的值(获取之后,记得-1)

一人一单功能:需要记录当前业务的优惠券,被那些用户购买过,以后再有用户来的时候,只需要判断是否存在,存在证明购买过,那就不能在买了

  • 满足一个key 保存多个值,(优惠券,被多人购买)
  • 优惠券里保存的用户id不能重复(一人一单 )
  • set集合,确保元素的唯一性,并且可以一个key里保存多个值,有用户要买,只需要在一个key里记录id即可,如果有重复的值,就不允许购买了,一人一单就实现了

  业务逻辑:0 是有购买资格,1、2没有购买资格,业务较长,要确保原子性(利用Lua命令)

利用Lua脚本来实现

如果是0 才会有购买资格,放入队列里,然后开启一个新的独立线程,来读取提前保存好客户的信息,就可以异步的来完成数据库的写的操作了;

在返回订单id给用户的那一刻,其实秒杀业务已经结束了,用户拿到了id 就可以去付款了,至于我们什么时候完成下单信息,将优惠券相关信息写到数据库的操作,时效性上已经不那么高了。

正是因为我们将同步的写数据操作,变成了异步操作,

  • 一方面缩短了秒杀业务的流程,大大提高了秒杀业务的并发
  • 另一方面还减轻了数据库的压力

 需求:

  • 1、新增秒杀优惠券的同时,将优惠券信息保存到Redis中

VoucherServiceImpl类,添加优惠券信息,这个库存可以永久保存到Redis当中,将来移除秒杀的时候,只需要手动删除即可

  1. @Resource
  2. private StringRedisTemplate stringRedisTemplate;
  3. @Override
  4. @Transactional
  5. public void addSeckillVoucher(Voucher voucher) {
  6. // 保存优惠券
  7. save(voucher);
  8. // 保存秒杀信息
  9. SeckillVoucher seckillVoucher = new SeckillVoucher();
  10. seckillVoucher.setVoucherId(voucher.getId());
  11. seckillVoucher.setStock(voucher.getStock());
  12. seckillVoucher.setBeginTime(voucher.getBeginTime());
  13. seckillVoucher.setEndTime(voucher.getEndTime());
  14. seckillVoucherService.save(seckillVoucher);
  15. // 1、新增秒杀优惠券的同时,将优惠券信息保存到Redis中,利用String结构,这里不需要配置剩余有效期,如果将来我们不需要了,再手动将它删除
  16. stringRedisTemplate.opsForValue().
  17. set(RedisConstants.SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
  18. }
  19. 工具类定义常量
  20. public static final String SECKILL_STOCK_KEY = "seckill:stock:";

然后利用postman去发起请求添加秒杀券即可,报文利用之前的即可

  • 2、基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  1. -- 1.参数列表
  2. -- 1.1.优惠券id
  3. local voucherId = ARGV[1]
  4. -- 1.2.用户id
  5. local userId = ARGV[2]
  6. -- 2.数据key
  7. -- 2.1.库存key 注意:lua脚本里 字符串拼接是用.. 而不是用+ 加号了
  8. local stockKey = 'seckill:stock:' .. voucherId
  9. -- 2.2.订单key
  10. local orderKey = 'seckill:order:' .. voucherId
  11. -- 业务1 判断库存是否充足
  12. if (tonumber(redis.call('get', stockKey) <= 0)) then
  13. return 1;
  14. end
  15. -- 业务2 判断用户是否下单,等于1存在,等于0不存在
  16. if (redis.call('sismember', orderKey, userId) == 1) then
  17. return 2;
  18. end
  19. -- 业务3 说明库存充足且用户没有下单
  20. -- 3.1 扣库存 利用incrby -1 修改库存
  21. redis.call('incrby', stockKey, -1)
  22. -- 3.2 下单保存用户 用sadd key faild
  23. redis.call('sadd', orderKey, userId)
  24. -- 成功返回0
  25. return 0

脚本写完后需要在java代码里去执行脚本

VoucherOrderServiceImpl类修改实现方法seckillVoucher

1、引入lua脚本

  1. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  2. static {
  3. SECKILL_SCRIPT = new DefaultRedisScript<>();
  4. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  5. SECKILL_SCRIPT.setResultType(Long.class);
  6. }

2、修改代码

  1. public Result seckillVoucher(Long voucherId) {
  2. // 获取用户id
  3. Long userId = UserHolder.getUser().getId();
  4. // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null"",它是一个空集合
  5. Long result = stringRedisTemplate.execute(
  6. SECKILL_SCRIPT,
  7. Collections.emptyList(),
  8. voucherId.toString(), userId.toString()
  9. );
  10. // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
  11. int r = result.intValue();
  12. if (r != 0) {
  13. // 2.1.不为0 ,代表没有购买资格
  14. Result.fail(r == 1 ? "库存不足" :"客户已下单");
  15. }
  16. // 2.20 将任务放到阻塞队列里
  17. long orderId = redisIdWorker.nextId("order");
  18. // 3.返回订单id
  19. return Result.ok(orderId);
  20. }
  • 3、如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 4、开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

修改代码

  1. public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
  2. @Resource
  3. private ISeckillVoucherService seckillVoucherService;
  4. @Resource
  5. private RedisIdWorker redisIdWorker;
  6. @Resource
  7. private RedissonClient redissonClient;
  8. @Resource
  9. private StringRedisTemplate stringRedisTemplate;
  10. private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  11. static {
  12. SECKILL_SCRIPT = new DefaultRedisScript<>();
  13. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  14. SECKILL_SCRIPT.setResultType(Long.class);
  15. }
  16. // 定义一个线程池,单线程的
  17. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  18. // 定义一个阻塞队列,范型就是封装的类型,参数是队列的大小
  19. private static final BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
  20. // 客户随时都在下单,这个类一加载完成后,就应该开始从阻塞队列里取数据了,利用springBoot的一个注解@PostConstruct注解来实现
  21. @PostConstruct // 这个注解的意思就是,下边的这个方法在当前类初始化完成后就开始执行
  22. private void init() {
  23. SECKILL_ORDER_EXECUTOR.submit(new handlerVoucherOrder());
  24. }
  25. // 定义一个线程任务,什么时候去执行?肯定是在用户秒杀抢购之前开始,因为用户一旦秒杀,任务就应该去从阻塞队列里取出订单信息
  26. private class handlerVoucherOrder implements Runnable{
  27. @Override
  28. public void run() {
  29. // 不断的从队列里取,定义一个死循环
  30. while (true) {
  31. // 从队列里获取订单信息,尽管while(true),但是,他是队列,有才取,没有就等待
  32. try {
  33. VoucherOrder voucherOrder = orderTask.take();
  34. // 创建订单
  35. proxy.createVoucherOrder(voucherOrder);
  36. } catch (Exception e) {
  37. log.info("订单创建异常", e);
  38. }
  39. }
  40. }
  41. }
  42. @Transactional
  43. public void createVoucherOrder(VoucherOrder voucherOrder) {
  44. // 注意,这里不不能用UserHolder来取,线程变了,会取不到的
  45. Long userId = voucherOrder.getUserId();
  46. Long voucherId = voucherOrder.getVoucherId();
  47. // 创建锁对象,其实这里获取锁,是一个兜底方案,也可以不用的,算是兜底防止redis崩了
  48. RLock redisLock = redissonClient.getLock("lock:order:" + userId);
  49. // 尝试获取锁
  50. boolean isLock = redisLock.tryLock();
  51. // 判断
  52. if (!isLock) {
  53. // 获取锁失败,直接返回失败或者重试,没必要返回了,直接是后段操作,不用返回前段,没意义
  54. log.error("不允许重复下单!");
  55. return;
  56. }
  57. // 以下代码是操作数据库的,前边我们只操作了redis缓存里的,这里是数组异步操作
  58. try {
  59. // 5.1.查询订单
  60. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  61. // 5.2.判断是否存在
  62. if (count > 0) {
  63. // 用户已经购买过了
  64. log.error("不允许重复下单!");
  65. return;
  66. }
  67. // 6.扣减库存
  68. boolean success = seckillVoucherService.update()
  69. .setSql("stock = stock - 1") // set stock = stock - 1
  70. .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
  71. .update();
  72. if (!success) {
  73. // 扣减失败
  74. log.error("库存不足!");
  75. return;
  76. }
  77. // 这里直接save即可
  78. save(voucherOrder);
  79. } finally {
  80. // 释放锁
  81. redisLock.unlock();
  82. }
  83. }
  84. // 代理定义成一个成员变量
  85. private IVoucherOrderService proxy = null;
  86. @Override
  87. public Result seckillVoucher(Long voucherId) {
  88. // 获取用户id
  89. Long userId = UserHolder.getUser().getId();
  90. // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null"",它是一个空集合
  91. Long result = stringRedisTemplate.execute(
  92. SECKILL_SCRIPT,
  93. Collections.emptyList(),
  94. voucherId.toString(), userId.toString()
  95. );
  96. // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
  97. int r = result.intValue();
  98. if (r != 0) {
  99. // 2.1.不为0 ,代表没有购买资格
  100. return Result.fail(r == 1 ? "库存不足" :"客户已下单");
  101. }
  102. // 2.20 将任务放到阻塞队列里,将订单id 用户id 代金券id 进行封装
  103. VoucherOrder voucherOrder = new VoucherOrder();
  104. // 2.3.订单id
  105. long orderId = redisIdWorker.nextId("order");
  106. voucherOrder.setId(orderId);
  107. // 2.4.用户id
  108. voucherOrder.setUserId(userId);
  109. // 2.5.代金券id
  110. voucherOrder.setVoucherId(voucherId);
  111. // 2.6.放入阻塞队列
  112. orderTask.add(voucherOrder);
  113. /*
  114. 2.7 添加代理对象,只有在主线程 才能实现代理,子线程是不可以的,因为它的底层也是ThreadLocal 去获取的,所以将代理对象
  115. 放在主线程里,要么当作参数传,要么写到成员变量
  116. 这里我们当作成员变量,放到主线程里去提前获取
  117. */
  118. proxy = (IVoucherOrderService) AopContext.currentProxy();
  119. // 3.返回订单id
  120. return Result.ok(orderId);
  121. }

 但是,基于JVM阻塞队列实现秒杀会有两个问题,

内存限制:

  • BlockIngQueue 是JDK的存在JVM里,阻塞队列有大小,如果说队列存满了仍有下单数据在进来怎么办?

数据安全问题:

  • 现在是基于内存来体现的,如果内存突然宕机了,那所有的订单信息就都丢失了,会导致数据安全问题。
  • 如果说有一个线程从队列里取出来一个任务开始执行,如果说突然报错了,那这个任务也就丢失了,队列里也不会再有了,也会导致数据安全问题

七、Redis-消息队列实现异步秒杀(3种)

消息队列可以理解为:快递员-快递柜-我

快递员送快递送多个人,将快递放到快递柜,每个人去拿自己,完成

如果说快递员送快递,快递柜没位置了,我又在公司上班,当我回家了,快递员又不上班,放在门口又怕丢,两边难受,而有了快递柜,快递员只需要将快递放在快递柜里,只要发个短信,我们什么时候有时间就去拿就好了,这样就解除了快递员和我们的耦合 

为什么用消息队列?

  • 消息队列是在JVM以外的,是一个独立的服务,不受JVM内存的限制,解决了上面第一个为题
  • 消息队列不仅仅是做数据存储,还可以确保我们的数据安全,我们存到消息队列里的消息它要做持久化,不过服务宕机还是重启数据都不会丢失,而且它还要消息投递给消费者后,要求消费者做确认,如果没有确认,那么这个消息就依然在消息队列中存在,下一次会再次投递给消费者,让它继续处理,直到成功为止,确保消息至少消费一次

市面上的消息队列,像阿里的消息队列、kafka等,搭建消息队列还是费点劲的

Redis有自己的消息队列

redis-基于list结构模拟消息队列

先进先出的形式,就符合了队列的特征,但是我们的java代码,将来实现的时候要消息队列有消息就去取消息,如果没有消息,我们期望的是卡在那里,等将来有消息了我们再去取,实现一个阻塞的效果,因此可以用BRPOP和BLPOP

 首先list是在内存之外的,而且redis是支持持久化的,数据安全性有保障

但是无法避免消息丢失,如果取出一个数据,list的pop是取出并且移除了,如果服务挂了,那就丢失了,其他消费者也拿不到,而且他还是单消费者,无法实现一条消息被多消费者消费的,有的业务需要多消费者消费的

redis-PupSub消息队列

可以实现多消费者,比较灵活,可以发布消息让一个消费者拿到,也可以发布消息让多个消费者拿到

psubscribe 是有通配符的,如:?是一个自负,*是任意,[ ad ] 制定字符

如果生产者发送的是:publist order.queue1,那么只有第二个消费可以拿到,第一个消费者是拿不到的,因为第二个消费者拿的是通配符的,第一个是固定的

redis-server 启动redis

redis-cli:开始redis客户端口,可以开启多个,如下,输入order.q2的时候只有一个窗口可以收到

缺点:不支持持久化,list支持是因为list本身就不是一个消息队列,是一个链表,用来做数据存储的,只不过我们把它当作消息队列来用了,Redis用来做数据存储的 都是支持数据持久化的。

而pubSub本身设计出来就是用来做消息发送的,如果说消息发送了,而这个频道没有被任何人订阅,那么这个频道就丢失了,也就是说我们发送的所有消息不会在redis里保存,如果没人收,直接就没了,不要了,这就是最大的问题。

不支持持久化,安全性没有保障

而且消息堆积有上线,虽然发送的消息不在内存中保存,发送的消息如果有消费者监听,会在消费者那里有一个缓存区域,把这个消息缓存下来,接下来消费者去处理,如果消费者处理的比较慢,处理一条消息需要1秒,如果说此时又来了10几条消息,那么这些消息都会缓存在客户端那里,就是消费者那里,而消费者那里的缓存空间是有上限的,如果超出了就会丢失

Redis消息队列-stream

  • 是一种全新的数据类型,是支持数据持久化的
  • 设置队列长度,如果消息队列的长度设置了1000,此时1000都满了还没有处理,此时又来了新的消息,那么会自动去剔除一些旧的消息,消息上限可以不用给值,表示不设置上限
  • *:代表消息的唯一id,表示由Redis自动生成

同一个消息,两个消费者都可以读取,说明不是单消费者的,而且在stream里,消息是永久存在的,读完之后是不会删除的,这里是从第一条消息,下标0开始的 

 如果想读最新消息那就用$,下边返回nil 是因为第一条消息已经读取过了,没有最新消息

如果想加上阻塞读取,就是有消息就读取,没消息就等待,那就加上bluck 后边跟一个阻塞毫秒时间,0是永久等待,如下,阻塞等待中,直到2发送了一条消息,1才读取到

阻塞读取了

基于stram实现阻塞队列,java伪代码,利用死循环来实现阻塞取,等待,无限尝试

注意当指定$时候,代表我们拿到的是最新消息,就是最后一条消息,可能会出现漏读消息的情况,比如当我们处理消息的时候,此时又来了7、8条数据,那么它只会读取最后一条数据 ,这样就出现漏读的情况了

总结:

消息可回溯:就是消息永久存在,不会丢失

利用消费者组-来解决漏读问题

将多个消费者分到一个组中,监听同一队列

  • 消息分流:就是说多个消费者只要在一个组里,他们的关系就是竞争关系,凡事进入到这个组的消息,大家就会去抢,一定程度上可以避免消息堆积的问题。如果说这个消息就想被多个消费者消费,那就加多个消费者组就可以了
  • 消息标识:可以解决单消费时漏消息
  • 消息确认:确保所有消息,只要拿到了,最少会被消费一次

一般情况下并不需要我们手动添加消费者,因为当我们从这个组当中指定一个消费者并且监听消息的时候,如果它发现这个消费者不存在,就是自动帮我们创建 

  • count 1:一次查询一条
  • noack:最好不要设置这个,设置了就变成手动ACK,就不会自动确认了,会出现漏读问题
  • ID:其他:比如给的是0,从pending-list里读取一个 消费了,那么list里就少了一个,移除了;下次还给0,读到的还是未消费的,和上一个不一样了,上一个已经消费了,就从list里移除了;也就是说永远给0,永远从list里读到最新的消息,直到把list里的消息都处理完为止 
  • 因此:正常情况都应该给>,读取那些未消费的消息,如果出现异常情况,我们再去pending-list里读,读取那些拿到了,但是未处理的(就是出了异常的消息),这也就是id配置的技巧了

 案例:注意:一个组内,只有一个标记,所以消费者2 和消费者1 每次拿到的都不一样

 虽然读了这么多消息,但是都没有确认过,利用XACK命令进行确认 

xack key group id[id...],执行后确认,这种是正常情况 key队列名称,group组的名称

 异常情况:刚拿到一条消息,处理的时候,挂嘞,这条消息没有确认ack,消费者只要拿到消息,一定会进去pending-list里,那么如何查看pending-list呢

 查看pending-list里的消息

命令:【】获取消息以后,确认之前的空闲时间 ,比如获取5000毫秒以上的消息,可以不给,即所有的我都要

start end pending-list里的 起始和结束的位置如:- + 最小到最大

 查询pending-list的消息,读取pending-list里的第一条消息

这就又一次拿到了k6了,可以再次去处理嘞 

然后去处理,确认ack,之后再去pending-list里拿,就没有了,这样就将正常消息,和异常消息全都处理嘞

 伪代码:所有消息处理都必须成功,实在不成功那就人工去处理,catch的代码,又空判断,是因为如果pending-list处理完确认后,会再次返回,然后没有了的话不就返回null了,然后就可以break了,确保消息一定被执行

 

  • 独立于JVM以外的,内存不受JVM限制
  • 可以做持久化,安全性有保障
  • 有确认机制,确保消息至少被消费一次等等加上上边5个

缺点,Redis的持久化,也不是万无一失的,也有丢失风险的,而且stream,只支持消费者的确认机制,是不支持生产者的确认机制的,建议还是用专业的消息队列如阿里的,还有消费的事务机制等,其实一般来说,stream已经可以满足中小型企业的需要了

 java代码实现stream

  • 1、直接利用命令行创建消息队列 

xgroup create stream.orders g1 0 mkstream 队列不存在时自动创建队列 stream.orders 是队列,g1 是组; 因为是新创建的队列,id直接 为0即可

  • 2、改造lua脚本:seckill.lua,发消息
  • 新添加参数orderId
  1. -- 1.3.订单id
  2. local orderId = ARGV[3]

保存用户后立刻将数据发送到消息队列里,用stream xadd 

  1. -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
  2. redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)

修改VoucherOrderServiceImpl类

  1. // 代理定义成一个成员变量
  2. private IVoucherOrderService proxy = null;
  3. @Override
  4. public Result seckillVoucher(Long voucherId) {
  5. // 获取用户id
  6. Long userId = UserHolder.getUser().getId();
  7. // 2.3.订单id
  8. long orderId = redisIdWorker.nextId("order");
  9. // 1.执行lua脚本,注意,我们这里没有传key,但是也不能传null 或"",它是一个空集合
  10. Long result = stringRedisTemplate.execute(
  11. SECKILL_SCRIPT,
  12. Collections.emptyList(),
  13. voucherId.toString(), userId.toString(), String.valueOf(orderId)
  14. );
  15. // 2.判断结果是否为0,这里是Long类型,包装类转换基本类型然后再去比较
  16. int r = result.intValue();
  17. if (r != 0) {
  18. // 2.1.不为0 ,代表没有购买资格
  19. return Result.fail(r == 1 ? "库存不足" :"客户已下单");
  20. }
  21. // 如下代码就省略了,不用放到阻塞队列里嘞,直接利用lua脚本放到消息队列里
  22. /*
  23. // 2.2 为0 将任务放到阻塞队列里,将订单id 用户id 代金券id 进行封装
  24. VoucherOrder voucherOrder = new VoucherOrder();
  25. voucherOrder.setId(orderId);
  26. // 2.4.用户id
  27. voucherOrder.setUserId(userId);
  28. // 2.5.代金券id
  29. voucherOrder.setVoucherId(voucherId);
  30. // 2.6.放入阻塞队列
  31. orderTask.add(voucherOrder);*/
  32. /*
  33. 2.7 添加代理对象,只有在主线程 才能实现代理,子线程是不可以的,因为它的底层也是ThreadLocal 去获取的,所以将代理对象
  34. 放在主线程里,要么当作参数传,要么写到成员变量
  35. 这里我们当作成员变量,放到主线程里去提前获取
  36. */
  37. proxy = (IVoucherOrderService) AopContext.currentProxy();
  38. // 3.返回订单id
  39. return Result.ok(orderId);
  40. }
  • 3、下单完成后收消息 

阻塞队列就不需要了,修改阻塞队列这一块的逻辑,如下的代码,全都注释,不要了

  1. // 定义一个阻塞队列,范型就是封装的类型,参数是队列的大小
  2. private static final BlockingQueue<VoucherOrder> orderTask = new ArrayBlockingQueue<VoucherOrder>(1024*1024);
  3. // 定义一个线程任务,什么时候去执行?肯定是在用户秒杀抢购之前开始,因为用户一旦秒杀,任务就应该去从阻塞队列里取出订单信息
  4. private class handlerVoucherOrder implements Runnable{
  5. @Override
  6. public void run() {
  7. // 不断的从队列里取,定义一个死循环
  8. while (true) {
  9. // 从队列里获取订单信息,尽管while(true),但是,他是队列,有才取,没有就等待
  10. try {
  11. VoucherOrder voucherOrder = orderTask.take();
  12. // 创建订单
  13. proxy.createVoucherOrder(voucherOrder);
  14. } catch (Exception e) {
  15. log.info("订单创建异常", e);
  16. }
  17. }
  18. }
  19. }

新增加一个线程任务

  1. private class handlerVoucherOrder implements Runnable{
  2. String queueName = "stream.orders";// 和lua脚本的名字一定不要写错,写错就报错了
  3. @Override
  4. public void run() {
  5. while (true) {
  6. try {
  7. // 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.orders >
  8. // consumer 消费者的参数; options选项 条数,阻塞时间; streamOffset偏移量 指定队列名称 和 id
  9. // 返回list 集合,可能是读取1条,有可能是多条,count
  10. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  11. Consumer.from("g1", "c1"),
  12. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  13. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  14. );
  15. // 判断消息是否为空
  16. if (list == null || list.isEmpty()) {
  17. // 如果失败,说明没有消息,继续下一次循环
  18. continue;
  19. }
  20. // 如果成功,可以下单
  21. // 解析消息中的订单list, string 是id 后边的两个object 是键值对,就是我们lua脚本里的 插入消息的键值对
  22. MapRecord<String, Object, Object> record = list.get(0);
  23. Map<Object, Object> value = record.getValue();
  24. // map 转对象
  25. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  26. createVoucherOrder(voucherOrder);
  27. // ack 确认 SACK stream.orders g1 id
  28. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
  29. } catch (Exception e) {
  30. handlePendingList();
  31. log.info("订单创建异常", e);
  32. }
  33. }
  34. }
  35. private void handlePendingList() {
  36. while (true) {
  37. try {
  38. // 获取pending-list中的订单信息 xreadgroup group g1 c1 count 1 streams streams.orders 0
  39. // consumer 消费者的参数; options选项 条数,阻塞时间; streamOffset偏移量 指定队列名称 和 id
  40. // 返回list 集合,可能是读取1条,有可能是多条,count
  41. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  42. Consumer.from("g1", "c1"),
  43. StreamReadOptions.empty().count(1),
  44. StreamOffset.create(queueName, ReadOffset.from("0")) // 没有0的常量,可以自己传
  45. );
  46. // 判断消息是否为空
  47. if (list == null || list.isEmpty()) {
  48. // 如果失败,说明没有消息,结束循环
  49. break;
  50. }
  51. // 如果成功,可以下单
  52. // 解析pending-list的订单list, string 是id 后边的两个object 是键值对,就是我们lua脚本里的 插入消息的键值对
  53. MapRecord<String, Object, Object> record = list.get(0);
  54. Map<Object, Object> value = record.getValue();
  55. // map 转对象
  56. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  57. createVoucherOrder(voucherOrder);
  58. // ack 确认 SACK stream.orders g1 id
  59. stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
  60. } catch (Exception e) {
  61. log.info("订单创建异常", e);
  62. try {
  63. Thread.sleep(50);
  64. } catch (InterruptedException interruptedException) {
  65. interruptedException.printStackTrace();
  66. }
  67. }
  68. }
  69. }
  70. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/571021
推荐阅读
相关标签
  

闽ICP备14008679号