赞
踩
大家好,我是小菜。
一个希望能够成为 吹着牛X谈架构 的男人!如果你也想成为我想成为的人,不然点个关注做个伴,让小菜不再孤单!
本文主要介绍
RabbitMQ的消息丢失问题
如有需要,可以参考
如有帮助,不忘 点赞 ❥
微信公众号已开启,小菜良记,没关注的同学们记得关注哦!
是的,最终是对 RabbitMQ 下手了!
面试中常见的RabbitMQ面试题也是多了去了,常见的如下:
这几个问题又得让你脑壳疼一阵子,是不是也在网上看了挺多博文介绍这方面的解决方案,但是却看了又忘,实际便是因为缺少实操,这篇小菜便重点讲述下 RabbitMQ 如何解决消息丢失问题~
消息可靠性问题我们又可能将其理解为如何防止消息丢失?那为什么消息会丢失呢?我们可以先看看消息投递的整个过程:
我们从图中可以从三个阶段分析可能造成消息丢失:
publisher 发送消息到 exchange
exchange 分发到 queue
queue 投递到 customer
既然我们知道了哪些阶段可能造成数据丢失,那我们就可以从源头防范于未然~!
工程结构
工程结构很简单,就是一个简单的 Spring Boot 项目,里面有个 消费者
和 生产者
两个模块
RabbitMQ 中提供了 publisher confirm
机制来避免消息发送到 MQ 的过程中丢失的问题。消息发送到 MQ 以后,会返回一个确认结果给生产者,用于表示消息是否确认成功。该确认结果存在两种请求:
publisher-confirm
该类型是 发送者确认 ,存在两种情况
ack
nack
publisher-return
该类型是 发送者回执 ,存在两种情况
ack
nack
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一ID,以区分不同消息,避免ack冲突
接下来我们用代码来说明具体的操作方式
我们首先看下 生产者
的配置文件
前面几个配置 RabbitMQ 的连接信息没啥好讲的,我们来看几个比较陌生的配置
publisher-confirm-type
开启发送确认,这里可以支持两种类型
publisher-returns
开启 public-return,同样是基于 CallBack 机制,不过是定义 ReturnCallback
template.mandatory
定义路由失败时的策略。
每个 RabbitTemplate 只能配置一个 ReturnCallback
执行发送代码之前,我们确保已经创建了(一个直连交换机
direct-exchange
,一个队列direct-queue
,且绑定的 key 为direct
正常情况下,我们执行代码肯定是发送成功的,可以看到控制台绿色输出
且我们在消息队列中也成功接收到了消息:
到这步是没有任何问题的,那我们就需要手动给它制造点问题~ 我们可以修改 交换机名称
,这个时候发送消息的时候找不到交换机,那么交换机肯定就会返回 nack
,再看是否可以进入到我们代码中的判断:
代码执行虽然是绿色的,但因为rabbitMQ找不到正确的交换机,而导致消息发送失败,也就是下图的这个过程:
这一个是 publish -> exchange
失败我们顺利的捕获到了,那么 exchange -> queue
这步的失败是我们是否能够正常捕获?我们可以通过修改 路由 key
使交换机路由不到对应的 queue
可以发现当交换机没有路由到相对应的 queue 时,也成功触发了我们自定义的回调函数,然后看 rabbitMQ 控制台是可以发现消息已经成功投递到交换机
到这里,我们通过两种简单的错误模拟,使程序都能顺利的进入到我们预先定义的回调中,如果遇到发送失败的情况,我们可以在失败的回调中自定义消息重发
机制,最大程度上避免消息丢失的问题
我们可以通过 publisher-confirm
和 publisher-return
两种错误捕获机制,来避免 生产者 -> exchange -> queue
这条链路的消息丢失
publisher-confirm
publisher-return
消息存储丢失是啥意思?其实就是持久化
的概念,当消息已经成功发送到 queue 时,这个时候如果消费者没有及时进行消费,rabbitMQ 又刚好宕机重启了,那么这个时候就会发现消息丢失了。
这是因为 MQ 默认是内存存储消息,我们可以通过开启持久化的功能来确保在 MQ 中的消息不丢失
其实我们通过 RabbitMQ 提供的 GUI 创建交换机或队列的时候就可以发现有持久化的这个选项
如果将 durability
设为 durable 后,我们可以发现无论如何重启 MQ,重启后交换机和队列依然存在。
但是很多时候我们交换机
和 队列
的创建并非在 GUI 上创建,而是通过应用代码的方式创建
默认情况下,AMQP 发出的消息都是持久化的,不用特意指定
RabbitMQ 采取的机制是当确认消息被消费者消费后就会立即删除
那么如何确认消息已被消费者消费?那就还得依靠回执来确认,消费者获取消息后,需要向 RabbitMQ 发送 ack
回执,表明自己已经处理消息。其中 ack
在 AMQP 中有三种确认模式:
上述三种方式都是通过修改配置文件:
该方式需要用户自己手动确认,灵活性较好
这个时候如果执行逻辑是正常的,那么在 RabbitMQ 上就会将该消息删除,但是如果执行的逻辑抛出了异常,没有进入到手动确认的环节,RabbitMQ 将会把该消息保留:
该方式在没有异常发生时会自动进行消息确认
我们在配置文件中将确认方式改为 auto
进行测试:
正常情况下接收消息是没有任何问题的,那我们同样制造些非正常情况:
我们手动制造了点异常,发现消息没有被 RabbitMQ 删除的同时,而且控制台一直在报错,无止境的在尝试重新消费,这如果放在线上环境难免有些令人崩溃。
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,就会导致 MQ 的消息处理飙升
而发生这种情况的原因所在便是因为 RabbitMQ的消息失败重试机制
,但很多时候我们可能不想一直重试,只需要经过几次尝试,如果失败就放弃处理,这个时候我们就需要在配置文件中配置失败重试机制:
开启该配置后,我们重启项目进行观察
通过控制台可以看到在重试 3 次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException
,说明本地重试机制生效了。而且我们回到 RabbitMQ 控制台可以看到对应消息被删除了,说明最后 SpringAMQP 返回的是 ack,导致消息被 MQ 删除
但是这种处理方式并不优雅
,重试后直接删除消息过于 暴力,那么有没有更好的处理方式?答案是有的!
我们可以利用 AMQP 提供的 MessageRecovery
接口来实现,该接口有三种不同的实现方式:
三种方式可以根据不同场景进行采用,分析一下,不难发现第三种 RepublishMessageRecoverer
是比较优雅的~ 当重试失败后会将消息投递到一个指定专门存放异常消息的队列,后续由人工集中进行处理!具体使用方式如下:
通过自定义异常处理后,我们重启项目查看控制台:
可以发现重试3次后,我们的异常消息进入到了我们自定义的异常队列中
该方式没啥好讲的~ 无论消息异常与否 MQ 都会进行删除!
假如这个时候面试再问你,如何确保 RabbitMQ消息的可靠性?那你可得好好唠嗑唠嗑
如何保证消息不丢失?
消息丢失可能发生在 发送时丢失(未送达 exchange / 未路由到 queue)
、消息未持久化而MQ宕机
、消费者接收消息未能正确消费
确认机制包括 publisher-confirm
和 publisher-return
当未送达到 交换机 我们可以通过 publisher-confirm 返回的 ack
和 nack
来确认
当 交换机 未成功路由到 队列,我们可以通过 publisher-return
自定义的回调函数来确认,每个 RabbitTemplate 只能配置一个 ReturnCallback
持久化功能分为 交换机持久化
、队列持久化
和 消息持久化
,我们都需要将 durable 设置为 true
auto
级别消费者确认机制有三种类型:manual (手动确认)
、auto (自动确认)
、none (关闭 ack)
我们手动设置 MessageResoverer
为 RepublishMessageRecoverer 方式,将投递失败的消息转到异常队列中,交由人工处理
这一套组合拳回答下来,面试官还不得默默承认你有点东西?
当然这只是 RabbitMQ 的问题之一,我们下篇继续其他几个问题的解决方式~
不要空谈,不要贪懒,和小菜一起做个吹着牛X做架构
的程序猿吧~点个关注做个伴,让小菜不再孤单。咱们下文见!
今天的你多努力一点,明天的你就能少说一句求人的话!
我是小菜,一个和你一起变强的男人。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/543521
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。