当前位置:   article > 正文

RabbitMQ的消息丢失、消息重复、消息积压问题_消息丢失,重复消费,消息积压

消息丢失,重复消费,消息积压

上一篇文章中我介绍了RabbitMQ实现分布式最终一致性的开发方案,这篇就来解决一下这个方案中存在的一些问题。http://t.csdnimg.cn/aOYTH

首先,RabbitMQ的三大问题:消息丢失、消息重复、消息积压

 这三个问题中最严重的就是消息丢失的问题。那我我们就来反向介绍一下这几个问题的最简单处理办法。

一、消息积压

消息积压,顾名思义就是给MQ发的消息太多了太快了,消费者消费不过来了。

 解决办法1: 我是富豪我就是有钱,ok,那我们就疯狂加服务器加消费者,直到能够消费的过来。

 解决办法2: 我是穷波一,好吧,穷波一得有自己苟活的法呀,我们再上线一个专门的队列消费服务,但是我们这个服务不去做对应的业务逻辑处理,而是直接把这些消息全部原封不动的存到我们的数据库,这样至少不会丢掉消息,我们记录到数据库,后续我们再慢慢的去处理这些消息就好啦。

举个栗子:这就像是淘宝买东西一样,假如我们的自己写的一个淘宝服务哈,我们写的很low,消费者处理消息很慢,可能我们的服务只能支持一分钟30个人购买,买的人再多就只能消息积压在MQ中,然后再有人继续购买就越等越久,最后导致积压,处理不过来,然后只能被迫给用户返回一个超时错误,用户一看,真垃圾,不买了,我们到手的money飞了。这种事我们怎么允许呢。money、money、money啊,到手的飞了,不可能!这时候我们就上线一个专门的消费者处理这些消息,因为我们不做业务逻辑处理,直接存表,所以我们的速度很快。这样我们一分钟可能就能够支持300个人购买了。太好了,money没飞掉,舒服。过后我们的服务再慢慢处理这些消息,哪怕是我第二天了才处理完,也没有事,money已经到账了,这是不是想想都开心(*^▽^*)。

二、消息重复

顾名思义 消息发送了两次或者多次

解决办法1: 其实通常我们的消费者服务接口都会实现接口的幂等性。(幂等性不懂的看我以前的文章吧~http://t.csdnimg.cn/X3rmg) 

解决办法2: 加防重表,用mysql啊redis啊都可以,唉这个就不想讲了,听到这个名字大家应该都能懂什么意思吧。

三、消息丢失

我滴妈,终于讲这个最重要的了,今天一大早就开始写,有点小累。

通俗理解就是 我们给MQ发了消息,但是最后我们没有消费到。

这个问题出现才是我们最懵逼,最头疼的。

但是想看懂这下面的解决办法首先要了解RabbitMQ的消息可靠投递,不懂的可以转到我以前的文章学习一下(http://t.csdnimg.cn/pwfpF

解决办法 1:加如MQ的消息日志表。(下面直接实战代码)

还是已最通用的电商下单逻辑为例,下面的实战代码就是我上一遍文章《基于RabbitMQ实现下单减库存的最终一致性分布式事务》中的代码基础上进一步完善的。(http://t.csdnimg.cn/aOYTH

1、先创建好我们用的MQ日志记录表。

  1. public class MqMessage {
  2. @TableId(type = IdType.INPUT)
  3. private String messageId;
  4. private String content;
  5. private String toExchange;
  6. private String routingKey;
  7. private String classType;
  8. private int messageStatus;
  9. private Date createTime;
  10. private Date updateTime;
  11. }

 2、提交订单,发送给MQ消息时,加入写入日志功能。

        MqMessage实体类,记录MQ的日志。

        下单业务逻辑部分 

  1. String jsonString = JSON.toJSONString(order.getOrder());
  2. MqMessage mqMessage = new MqMessage();
  3. mqMessage.setMessageId(uuid);
  4. mqMessage.setMessageStatus(0); // 新建
  5. mqMessage.setContent(jsonString);
  6. mqMessage.setRoutingKey("order.create.order");
  7. mqMessage.setToExchange("order-event-exchange");
  8. mqMessage.setClassType(OrderEntity.class.getTypeName());
  9. mqMessage.setCreateTime(new Date());
  10. mqMessage.setUpdateTime(new Date());
  11. mqMessageService.save(mqMessage);
  12. rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder(), correlationData);

3、MQ的监听器,里面包含两部分的回调函数,可以判断消息发送在什么阶段。具体代码中也有详细注释。

  1. /**
  2. * MQ 的回调函数
  3. */
  4. @Configuration
  5. public class MyRabbitConfig {
  6. @Autowired
  7. RabbitTemplate rabbitTemplate;
  8. @Autowired
  9. MqMessageService mqMessageService;
  10. @Primary
  11. @Bean
  12. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  13. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  14. this.rabbitTemplate = rabbitTemplate;
  15. rabbitTemplate.setMessageConverter(messageConverter());
  16. initRabbitTemplate();
  17. return rabbitTemplate;
  18. }
  19. // 把消息使用json序列化的方式
  20. @Bean
  21. public MessageConverter messageConverter() {
  22. return new Jackson2JsonMessageConverter();
  23. }
  24. /**
  25. * 定制 RabbitTemplate
  26. * 1、服务器收到消息
  27. * 1、spring.rabbitmq.publisher-confirms=true
  28. * 2、设置确认回调 setConfirmCallback
  29. * 2、 消息正确抵达队列 回调
  30. * 1、## 开启发送端消息抵达队列的确认
  31. * spring.rabbitmq.publisher-returns=true
  32. * 2、## 只要抵达队列 优先异步回调
  33. * spring.rabbitmq.template.mandatory=true
  34. * 3、消费端
  35. * 1、##手动ACK确认
  36. * spring.rabbitmq.listener.simple.acknowledge-mode=manual
  37. */
  38. public void initRabbitTemplate() {
  39. /**
  40. * ACK 确认机制
  41. * @param correlationData correlation data for the callback.
  42. * @param ack true for ack, false for nack
  43. * @param cause An optional cause, for nack, when available, otherwise null.
  44. */
  45. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  46. System.out.println("==============成功发送消息到服务器回调==========");
  47. System.out.println("correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);
  48. if (ack) {
  49. // 消息已成功发送到Broker并得到确认
  50. System.out.println("订单 消息发送成功");
  51. String id = correlationData.getId();
  52. MqMessage mqMessage = new MqMessage();
  53. mqMessage.setMessageId(id);
  54. mqMessage.setMessageStatus(1); // 发送成功
  55. mqMessage.setUpdateTime(new Date());
  56. mqMessageService.updateById(mqMessage);
  57. // 在这里执行相应的逻辑
  58. } else {
  59. // 消息发送失败或未得到确认
  60. System.out.println("消息发送失败:" + cause);
  61. // 在这里执行相应的逻辑
  62. }
  63. });
  64. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  65. /**
  66. // * 只要消息没投递到指定的队列,就触发失败回调
  67. // * @param message the returned message.
  68. // * @param i the reply code.
  69. // * @param s the reply text.
  70. // * @param s1 the exchange.
  71. // * @param s2 the routing key.
  72. // */
  73. @Override
  74. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  75. System.out.println("=============交换机发送消息到消息队列失败回调==========");
  76. System.out.println("message===>" + message + "replyCode===>" + replyCode + "replyText==>" + replyText + "exchange==>" + exchange + "routingKey==>" + routingKey);
  77. System.out.println("消息为正确抵达队列。。。");
  78. String string = new String(message.getBody());
  79. OrderEntity orderEntity = JSON.parseObject(string, OrderEntity.class);
  80. MqMessage mqMessage = new MqMessage();
  81. mqMessage.setMessageId(orderEntity.getMqMessageId());
  82. mqMessage.setMessageStatus(2); // 错误抵达
  83. mqMessage.setUpdateTime(new Date());
  84. mqMessageService.updateById(mqMessage);
  85. }
  86. });
  87. }
  88. }

4、消费者阶段,判断是否成功处理消息。

  1. @RabbitHandler
  2. public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
  3. System.out.println("过期订单, 准备关闭。。。");
  4. try {
  5. orderService.closeOrder(orderEntity);
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  7. String string = new String(message.getBody());
  8. System.out.println("记录MQ 的日志 , string=" + string);
  9. MqMessage mqMessage = new MqMessage();
  10. mqMessage.setMessageId(orderEntity.getMqMessageId());
  11. mqMessage.setMessageStatus(3); // 已抵达
  12. mqMessage.setUpdateTime(new Date());
  13. mqMessageService.updateById(mqMessage);
  14. }catch (Exception e){
  15. e.printStackTrace();
  16. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  17. }
  18. }

ok,看一下我们的测试结果。

这样就不怕消息丢失了。

MQ的知识应该暂时就可以撒花了。坚持就是胜利。 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号