当前位置:   article > 正文

RabbitMQ如何保证消息不丢失

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。

第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

RabbitMQ消息丢失解决方案 

针对生产者 

1、开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

  1. // 开启事务
  2. channel.txSelect();
  3. try {
  4. // 这里发送消息
  5. } catch (Exception e) {
  6. channel.txRollback();
  7. // 这里再次重发这条消息
  8. }
  9. // 提交事务
  10. channel.txCommit();

缺点:

RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

2、使用confirm机制

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

  1. //开启confirm
  2. channel.confirm();
  3. //发送成功回调
  4. public void ack(String messageId){
  5. }
  6. // 发送失败回调
  7. public void nack(String messageId){
  8. //重发该消息
  9. }

针对RabbitMQ

RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。

解决办法:

设置持久化有两个关注点:

第一个是创建 queue 的时候将其设置为持久化

第二个是发送消息的时候将消息设置为持久化

第一种:元配置持久化

在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。

  1. // 1. 创建持久化交换器 如果不存在自动创建
  2. channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
  3. // 2. 创建持久化队列 如果不存在自动创建
  4. channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
  5. channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());

第二种:消息持久化

发送消息的时候将消息的deliveryMode设置为2,或者是

MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

下面是采用springboot整合的rabbitTemplate来实现消息发送 

  1. MessageProperties props = new MessageProperties();
  2. props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式
  3. Message message = new Message(payload, props);
  4. rabbitTemplate.send(exchange, routingKey, message);
  5. // 或者直接传递Message对象,其中包含已设置持久化的MessageProperties
  6. rabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));
  7. // 或者使用MessagePostProcessor接口动态设置消息属性
  8. rabbitTemplate.convertAndSend(
  9. exchange,
  10. routingKey,
  11. messageBody,
  12. new MessagePostProcessor() {
  13. @Override
  14. public Message postProcessMessage(Message message) throws AmqpException {
  15. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  16. return message;
  17. }
  18. }
  19. );

下面是采用SpringBoot中通过new来创建连接来实现消息发送

  1. Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();
  2. if (Objects.nonNull(connection)) {
  3. log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");
  4. Channel channel = connection.createChannel();
  5. //消息持久化到磁盘,避免因为突然断电或重启导致消息丢失
  6. channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));
  7. log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
  8. }

 消费者确认机制

如果上述生产端消息队列都正确投递,那么问题出现在消费端是否可以正确消费?

消费者在成功处理了一条消息后通知RabbitMQ,这样RabbitMQ在收到确认后才会移除队列中的消息。

默认情况下,以下3种原因导致消息丢失:

1、 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;

2、 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;

3、 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

应对方案:

  • 将自动ack机制改为手动ack机制。

  1. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  2. try {
  3. //接收消息,业务处理
  4. //设置手动确认
  5. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  6. } catch (Exception e) {
  7. //发生异常时,可以选择重新发送消息或进行错误处理
  8. // 例如,可以选择负确认(nack),让消息重回队列
  9. // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
  10. }
  11. };
  12. //设置autoAck为false,表示关闭自动确认机制,改为手动确认
  13. channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

消息补偿机制 

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。

如何做消息补偿呢?其实就是将消息入库,通过定时任务重新发送失败的消息。详细流程如下:

  • 生产端发送消息;

  • 确认失败,将消息保存到数据库中,并设置初始状态0;

  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);

  • 重发消息,可多次;

  • 重发成功,更新数据库:status=1;

  • 超过固定次数重发仍然失败,人工干预。

标注:

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列

  • 可人工干预手动排查

  • 也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。这里涉及到消息去重、幂等性处理等。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/1015824
推荐阅读
相关标签
  

闽ICP备14008679号