当前位置:   article > 正文

RabbitMQ如何做到不丢不重_生产环境rabbitmq如何保证消息不丢

生产环境rabbitmq如何保证消息不丢

目录

MQTT协议

如何保证消息100%不丢失

生产端可靠性投递

​编辑

RabbitMQ的Broker端投

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

消费端

ACK机制改为手动

总结


MQTT协议

先来说下MQTT协议中的3种语义,这个非常重要。

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;  

所以要保证消息不丢,就得从三个方面入手,分别是生产端、RabbitMQ的Broker端、消费端。三个都保证不丢失,才能保证100%不丢。

生产端可靠性投递

事务机制:

  1. // 设置channel开启事务
  2. rabbitTemplate.setChannelTransacted(true);
  3. @Bean
  4. public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory)
  5. {
  6. return new RabbitTransactionManager(connectionFactory);
  7. }
  8. @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
  9. public void publishMessage(String message) throws Exception {
  10. rabbitTemplate.setMandatory(true);
  11. rabbitTemplate.convertAndSend("java",message);
  12. }

confirm消息确认机制:

  1. # 开启发送确认
  2. spring.rabbitmq.publisher-confirm-type=correlated
  3. # 开启发送失败回退
  4. spring.rabbitmq.publisher-returns=true
  1. @Configuration
  2. @Slf4j
  3. public class RabbitMQConfig {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @PostConstruct
  7. public void enableConfirmCallback() {
  8. //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
  9. //correlationData 可在发送时指定消息唯一 id
  10. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  11. if(!ack){
  12. //记录日志、落库定时任务扫描重发,同时对开发人员进行通知
  13. }
  14. });
  15. //当消息成功发送到交换机没有路由到队列触发此监听
  16. rabbitTemplate.setReturnsCallback(returned -> {
  17. //记录日志、落库、同时对开发人员进行通知
  18. });
  19. }
  20. }

一般不推荐事务的模式,因为是同步的会影响性能,所以都会采用异步回调的confirm模式。

RabbitMQ的Broker端投

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降。

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

消费端
ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。
我们需要进行手动消费

  1. #开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

 basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。一般就不会再次入队了,而是记录并通知开发人员,进行手动处理

  1. @RabbitHandler
  2. @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
  3. public void process(String msg, Message message, Channel channel) {
  4. long tag = message.getMessageProperties().getDeliveryTag();
  5. Action action = Action.SUCCESS;
  6. try {
  7. System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
  8. if ("bad".equals(msg)) {
  9. throw new IllegalArgumentException("测试:抛出可重回队列的异常");
  10. }
  11. if ("error".equals(msg)) {
  12. throw new Exception("测试:抛出无需重回队列的异常");
  13. }
  14. } catch (IllegalArgumentException e1) {
  15. e1.printStackTrace();
  16. //根据异常的类型判断,设置action是可重试的,还是无需重试的
  17. action = Action.RETRY;
  18. } catch (Exception e2) {
  19. //打印异常
  20. e2.printStackTrace();
  21. //根据异常的类型判断,设置action是可重试的,还是无需重试的
  22. action = Action.REJECT;
  23. } finally {
  24. try {
  25. if (action == Action.SUCCESS) {
  26. //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
  27. channel.basicAck(tag, false);
  28. } else if (action == Action.RETRY) {
  29. //Nack,拒绝策略,消息重回队列
  30. channel.basicNack(tag, false, true);
  31. } else {
  32. //Nack,拒绝策略,并且从队列中删除
  33. channel.basicNack(tag, false, false);
  34. }
  35. channel.close();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  41. }
总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/772984
推荐阅读
相关标签
  

闽ICP备14008679号