当前位置:   article > 正文

RabbitMQ解决幂等性问题_rabbitmq消息幂等性

rabbitmq消息幂等性

目录

1、什么是消息的幂等性

2、消息重试机制

3、消息重试的过程中可能会存在重复消费问题,如何解决?

4、总结


1、什么是消息的幂等性

        用户对于同一操作发起的一次请求或者多次请求的结果是一致的。

        如果一个插入操作的消费者,插入完成之后的代码发生了异常,这样就抛出异常,然后rabbitmq就会使用重试机制,这样这个消费者就会不断的往数据库里插入该订单数据,这样就会产生多条同样的订单数据,造成结果不一致的。

2、消息重试机制

重试机制实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
    如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

重试机制:

当我们消费者处理执行我们业务代码的时候,如果抛出异常:

        在这时候mq自动触发重试机制,默认的情况下rabbitmq是无限次数的重试,需要人为指定重试次数限制问题。

  1. spring:
  2. rabbitmq:
  3. # 连接地址
  4. host: 127.0.0.1
  5. # 端口号
  6. port: 5672
  7. # 账号
  8. username: guest
  9. # 密码
  10. password: guest
  11. # 地址(类似于数据库的概念)
  12. virtual-host: /admin_vhost
  13. # 消费者监听相关配置
  14. listener:
  15. simple:
  16. retry:
  17. # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
  18. enabled: true
  19. # 最大重试次数
  20. max-attempts: 5
  21. # 重试间隔时间(毫秒)
  22. initial-interval: 3000

哪些情况需要重试?哪些情况不需要?

A、消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?

        该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。

B、消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?

        该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。

        还可以使用第三方接口自动进行补偿(补偿,就行将没有消费的消息按照消费逻辑处理):RabbitMQ在消费者消费发生异常时,消费者 会调用第三方接口进行补偿,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

3、消息重试的过程中可能会存在重复消费问题,如何解决?

模拟会发生重试的消费者:

  1. @Slf4j
  2. @Component
  3. @RabbitListener(queues = "fanout_order_queue")
  4. public class FanoutOrderConsumer {
  5. @Autowired
  6. private OrderManager orderManager;
  7. @Autowired
  8. private OrderMapper orderMapper;
  9. @RabbitHandler
  10. public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
  11. log.info(">>orderEntity:{}<<", orderEntity.toString());
  12. //保存订单
  13. int result = orderManager.addOrder(orderEntity);
  14. //执行到这里会抛出异常 此时rabbitmq会发生重试 这样该订单就会在数据库中存入多份
  15. int i=1/0;
  16. log.info(">>插入数据库中数据成功<<");
  17. }
  18. }

         投递消息前,生成一个全局的订单id,然后封装到orderEntiry实体类中。消费者插入订单前,先通过该订单id去数据库中查询这个订单,如果存在就结束 不存在,就往数据库中插入。

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.mayikt.entity.OrderEntity;
  3. import com.mayikt.manager.OrderManager;
  4. import com.mayikt.mapper.OrderMapper;
  5. import com.rabbitmq.client.Channel;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.springframework.amqp.core.Message;
  9. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  10. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import java.io.IOException;
  14. @Slf4j
  15. @Component
  16. @RabbitListener(queues = "fanout_order_queue")
  17. public class FanoutOrderConsumer {
  18. @Autowired
  19. private OrderManager orderManager;
  20. @Autowired
  21. private OrderMapper orderMapper;
  22. @RabbitHandler
  23. public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
  24. try {
  25. log.info(">>orderEntity:{}<<", orderEntity.toString());
  26. //获取订单号,
  27. String orderId = orderEntity.getOrderId();
  28. //订单号为空直接结束
  29. if (StringUtils.isEmpty(orderId)) {
  30. return;
  31. }
  32. //防止重复消费 全局id
  33. OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
  34. //如果发生重试 会先看刚订单是否已经插入到数据库 如果存在 就 renturn;结束掉
  35. if (dbOrderEntity != null) {
  36. log.info("另外消费者已经处理过该业务逻辑");
  37. //手动确认消息 已经消费
  38. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  39. return;
  40. }
  41. //保存订单
  42. int result = orderManager.addOrder(orderEntity);
  43. log.info(">>插入数据库中数据成功<<");
  44. //手动确认消息 已经消费
  45. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  46. } catch (Exception e) {
  47. //1、记录该消息日志形式 存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿
  48. //2、将该消息存放到死信队列中,单独写一个死信消费者实现消费。
  49. //3、也可以调用第三方接口进行补偿,如果补偿失败 就抛异常,让rabbitmq进行重试
  50. }
  51. }
  52. }

4、总结

        如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿

      

       

        

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/710060
推荐阅读
相关标签
  

闽ICP备14008679号