当前位置:   article > 正文

基于redis实现消息队列(更推荐使用专业的mq)_redis实现队列

redis实现队列

目录

 利用redis实现消息队列(基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))

 基于pubsub实现消息队列(发布订阅模型)

 基于stream的消息队列

方法一:

存放消息

 读取消息

​编辑  案例

 特点:

 方法二:消费者组

 创建

 读取

 java实现思路

 特点

三种方式的对比:

 案例,更改之前的案例


基于基于秒杀-----分布式锁----lua脚本_xzm_的博客-CSDN博客改进

 利用redis实现消息队列基于list,点对点模型)——lpush存放队列(lpush 队列名 队列内容(可一次存放多个内容,用空格隔开)) brpop取队列(brpop 队列名 等待时间单位秒(一次取一个))

优缺点:

 基于pubsub实现消息队列(发布订阅模型)

 

 优缺点:

 基于stream的消息队列

方法一:

存放消息

 读取消息

  案例

 特点:

 方法二:消费者组

 创建

 读取

 java实现思路

 特点

三种方式的对比:

 个人感觉:mq>stream>list>pubsub

 案例,更改之前的案例

创建消息队列

 修改Lua秒杀脚本

  1. ---
  2. --- Generated by EmmyLua(https://github.com/EmmyLua)
  3. --- Created by Lenovo.
  4. --- DateTime: 2023/5/30 16:55
  5. ---
  6. -- 1.参数列表
  7. -- 1.1 . 优惠卷id
  8. local voucherId= ARGV[1]
  9. -- 1.2. 用户id
  10. local userId = ARGV[2]
  11. -- 1.3 订单id
  12. local orderId = ARGV[3]
  13. -- 2.数据key
  14. --2.1 库存key
  15. local stockKey = 'seckill:stock:' .. voucherId
  16. --2.2 订单key
  17. local orderKey = 'seckill:order:' .. voucherId
  18. -- 3. 脚本业务
  19. -- 3.1. 判断库存是否充足 get stockKey
  20. if (tonumber(redis.call('get',stockKey)) <= 0 ) then
  21. -- 3.2. 库存不足,返回1
  22. return 1
  23. end
  24. -- 3.2. 判断用户是否下单 SISMEMBER orderKey userId
  25. if (redis.call('SISMEMBER',orderKey,userId) == 1) then
  26. -- 3.3. 存在,说明是重复下单,返回2
  27. return 2
  28. end
  29. -- 3.4. 扣库存 incrby stockKey -1
  30. redis.call('incrby',stockKey, -1)
  31. -- 3.5. 下单(保存用户) sadd orderKey userId
  32. redis.call('sadd',orderKey,userId)
  33. -- 3.6 发送消息到队列中,xadd stream.orders * k1 v1 k2 v2 ...
  34. redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
  35. return 0

修改java代码实现发送消息

  1. /**
  2. * 基于stream的实现秒杀
  3. * @param voucherId
  4. * @return
  5. */
  6. public Result seckillVoucher(Long voucherId) {
  7. //获取用户
  8. Long userId = UserHolder.getUser().getId();
  9. //生成订单id
  10. long orderId = redisIdWorker.nextId("order");
  11. //执行lua脚本
  12. Long result = stringRedisTemplate.execute(
  13. SECKILL_SCRIPT,//脚本文件
  14. Collections.emptyList(),//key的集合
  15. voucherId.toString(), userId.toString() ,String.valueOf(orderId)//三个ARGV
  16. );
  17. //将返回值转换为int
  18. int i = result.intValue();
  19. //判断结果是否为零
  20. if (i != 0 ){
  21. //不为零,无法购买 ---1为库存不足,2为用户已经下单
  22. return Result.fail(i == 1 ? "库存不足" : "不能重复下单");
  23. }
  24. //可以购买
  25. //获取代理对象
  26. proxy = (IVoucherOrderService) AopContext.currentProxy();
  27. //返回订单id
  28. return Result.ok(orderId);
  29. }

 实现消息的消费

  1. //创建线程池
  2. private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
  3. @PostConstruct
  4. private void init(){
  5. SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
  6. }
  7. private class VoucherOrderHandler implements Runnable{
  8. String queueName="stream.orders";
  9. @Override
  10. public void run() {
  11. while (true){
  12. try {
  13. //1.获取消息队列中的订单消息,xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
  14. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  15. Consumer.from("g1", "c1"),
  16. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  17. StreamOffset.create(queueName, ReadOffset.lastConsumed())
  18. );
  19. //2.判断消息是否获取成功
  20. if (list ==null || list.isEmpty()){
  21. //获取失败,进行下一次循环
  22. continue;
  23. }
  24. //3.解析消息中的订单信息
  25. MapRecord<String, Object, Object> entries = list.get(0);
  26. Map<Object, Object> value = entries.getValue();
  27. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  28. //2.创建订单
  29. handleVoucherOrder(voucherOrder);
  30. //ack确认 ack stream.orders g1 id
  31. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
  32. } catch (Exception e) {
  33. log.error("处理订单异常",e);
  34. handlePendingList();
  35. }
  36. }
  37. }
  38. private void handlePendingList(){
  39. while (true){
  40. try {
  41. //1.获取pending-list中的订单消息,xreadgroup group g1 c1 count 1 streams streams.order >
  42. List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
  43. Consumer.from("g1", "c1"),
  44. StreamReadOptions.empty().count(1),
  45. StreamOffset.create(queueName, ReadOffset.from("0"))
  46. );
  47. //2.判断消息是否获取成功
  48. if (list ==null || list.isEmpty()){
  49. //获取失败,退出循环
  50. break;
  51. }
  52. //3.解析消息中的订单信息
  53. MapRecord<String, Object, Object> entries = list.get(0);
  54. Map<Object, Object> value = entries.getValue();
  55. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
  56. //2.创建订单
  57. handleVoucherOrder(voucherOrder);
  58. //ack确认 ack stream.orders g1 id
  59. stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",entries.getId());
  60. } catch (Exception e) {
  61. log.error("处理pending-list订单异常",e);
  62. try {
  63. Thread.sleep(20);
  64. } catch (InterruptedException ex) {
  65. ex.printStackTrace();
  66. }
  67. }
  68. }
  69. }
  70. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/395937
推荐阅读
相关标签
  

闽ICP备14008679号