当前位置:   article > 正文

RabbitMQ知识点总结_rabbitmq 只能监听消费 不能主动消费吗

rabbitmq 只能监听消费 不能主动消费吗

1、为什么使用MQ?MQ的优点
简答

异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。
流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
日志处理 - 解决大量日志传输。
消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

详答

主要是:解耦、异步、削峰。

解耦:A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

异步:A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。

削峰:减少高峰时期对服务器压力。

2、消息队列有什么优缺点?RabbitMQ有什么优缺点?
优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。

缺点有以下几个:

系统可用性降低

本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统消息不能发出了。因此,系统可用性会降低;

系统复杂度提高

加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。

一致性问题

A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

3、你们公司生产环境用的是什么消息中间件?
这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。

举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。

但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。

然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。

另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。

除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。

但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。

而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。

另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。

但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。

因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

4、MQ 有哪些常见问题?如何解决这些问题?
MQ 的常见问题有:

消息的顺序问题
消息的重复问题

消息的顺序问题

消息有序指的是可以按照消息的发送顺序来消费。

假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?


解决方案:

(1)保证生产者 - MQServer - 消费者是一对一对一的关系

缺陷:

并行度就会成为消息系统的瓶颈(吞吐量不够)
更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

(2)通过合理的设计或者将问题分解来规避。
不关注乱序的应用实际大量存在
队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

消息的重复问题

造成消息重复的根本原因是:网络不可达。

消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

5、什么是RabbitMQ?
RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件

rabbitmq 的使用场景
(1)服务间异步通信

(2)顺序消费

(3)定时任务

(4)请求削峰

RabbitMQ基本概念
Broker: 简单来说就是消息队列服务器实体
Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
Queue: 消息队列载体,每个消息都会被投入到一个或多个队列
Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
Producer: 消息生产者,就是投递消息的程序
Consumer: 消息消费者,就是接受消息的程序
Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

6、 RabbitMQ的工作模式
一.simple模式(即最简单的收发模式)


1.消息产生消息,将消息放入队列

2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。

二.work工作模式(资源的竞争)

1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。

三.publish/subscribe发布订阅(共享资源)

1、每个消费者监听自己的队列;

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

四.routing路由模式
1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

2.根据业务功能定义路由字符串

3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

五.topic 主题模式(路由模式的一种)

1.星号井号代表通配符

2.星号代表多个单词,井号代表一个单词

3.路由功能添加模糊匹配

4.消息产生者产生消息,把消息交给交换机

5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)

7、消息处理
消息如何分发?
若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能

消息怎么路由?
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种:

fanout:如果交换器收到消息,将会广播到所有绑定的队列上

direct:如果路由键完全匹配,消息就被投递到相应的队列

topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符

消息基于什么传输?
由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。

如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?

发送方确认模式

将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

如何保证RabbitMQ消息的可靠传输?
消息不可靠的情况可能是消息丢失,劫持等原因;

丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;

生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;

rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;

如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

消息队列丢数据:消息持久化。

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。

这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢?

这里顺便说一下吧,其实也很容易,就下面两步

将queue的持久化标识durable设置为true,则代表是一个持久的队列
发送消息的时候将deliveryMode=2
这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

为什么不应该对所有的 message 都使用持久化机制?
首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。

其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。

所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

8、集群
如何保证高可用的?RabbitMQ 的集群
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
消息积压处理办法:临时紧急扩容:

先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

**MQ中消息失效:**假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

**mq消息队列块满了:**如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

9、RabbitMQ死信队列
死信队列 是 当消息在一个队列 因为下列原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
队列超载

变成了 “死信” 后 被重新投递(publish)到另一个Exchange 该Exchange 就是DLX 然后该Exchange 根据绑定规则 转发到对应的 队列上 监听该队列 就可以重新消费 说白了 就是 没有被消费的消息 换个地方重新被消费
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

什么是死信呢?什么样的消息会变成死信呢?
消息被拒绝(basic.reject或basic.nack)并且requeue=false.
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)

应用场景分析
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

如何使用死信交换机呢?
定义业务(普通)队列的时候指定参数
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey

10、其他
1、 RabbitMQ中的broker是指什么?cluster又是指什么?
broker是指一个或多个erla ng node的逻辑分组,且node上运行着 RabbitMQ应用程序。cluster是在broker的基础之上,增加了 node之间 共享元数据的约束。
2、 什么是元数据?元数据分为哪些类型?包括哪些内容?与cluster相关的 元数据有哪些?元数据是如何保存的?元数据在cluster中是如何分布的? 在非cluster模式下,元数据主要分为Queue元数据(queue名字和属性 等)、Exchange元数据(exchange名字、类型和属性等)、Binding元数据 (存放路由关系的查找表)、Vhost元数据(vhost范围内针对前三者的名字空 间约束和安全属性设置)。在cluster模式下,还包括cluster中node位置 信息和node关系信息。元数据按照erla ng node的类型确定是仅保存于 RAM中,还是同时保存在RAM和disk上。元数据在cluster中是全node 分布的。
3、 RAM node 和 disk node 的区别?
RAM node 仅将 fabric(即 queue、exchange 和 binding等 RabbitMQ基础构 件)相关元数据保存到内存中,但disk node会在内存和磁盘中均进行存 储。RAM node上唯一会存储到磁盘上的元数据是cluster中使用的disk node的地址。要求在RabbitMQ cluster中至少存在一个disk node。
4、 RabbitMQ上的一个queue中存放的message是否有数量限制? 可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处 理效率的下降。
5、 vhost是什么?起什么作用?
vhost可以理解为虚拟broker ,即mini-RabbitMQ servero其内部均含有 独立的queue、exchange和binding等,但最最重要的是,其拥有独立的 权限系统,可以做到vhost范围的用户控制。当然,从RabbitMQ的全局 角度,vhost可以作为不同权限隔离的手段(一个典型的例子就是不同的应 用可以跑在不同的vhost中)。
6、 在单node系统和多node构成的cluster系统中声明queue、 exchange,以及进行binding会有什么不同?
当你在单node上声明queue时,只要该node上相关元数据进行了变 更,你就会得到Queue.Declare-ok回应;而在cluster上声明queue,则要 求cluster上的全部node都要进行元数据成功更新,才会得到 Queue.Declare-ok回应。另外,若node类型为RAM node则变更的数据 仅保存在内存中,若类型为disk node则还要变更保存在磁盘上的数据。
7、 客户端连接到cluster中的任意node上是否都能正常工作?
是的。客户端感觉不到有何不同。
8、 若cluster中拥有某个queue的owner node失效了,且该queue 被声明具有durable属性,是否能够成功从其他node上重新声明该 queue ?
不能,在这种情况下,将得到404 NOT_FOUND错误。只能等queue所 属的node恢复后才能使用该queue。但若该queue本身不具有durable 属性,则可在其他node上重新声明。
9、 cluster中node的失效会对consumer产生什么影响?若是在 cluster 中创建了 mirrored queue ,这时 node 失效会对 consumer 产 生什么影响?
若是con sumer所连接的那个node失效(无论该node是否为con sumer 所订阅queue的own er node),则con sumer会在发现TCP连接断开时, 按标准行为执行重连逻辑,并根据“ Assume Nothi ng”原则重建相应的 fabric即可。若是失效白勺n ode为con sumer订阅queue白勺owner no de, 则 con sumer 只能通过 Con sumer Ca ncellation Notification 机制来检测与 该queue订阅关系的终止,否则会出现傻等却没有任何消息来到的问 题。
10、 能够在地理上分开的不同数据中心使用RabbitMQ cluster么? 不能。
第一,你无法控制所创建的queue实际分布在cluster里的哪个node上 (一般使用HAProxy + cluster模型时都是这样),这可能会导致各种跨地域 访问时的常见问题;
第二,Erlang的OTP通信框架对延迟的容忍度有限,这可能会触发各种 超时,导致业务疲于处理;
第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而 RabbitMQ目前无法处理(该问题主要是说Mn esia)。
11、 为什么heavy RPC的使用场景下不建议采用disk node
heavy RPC是指在业务逻辑中高频调用RabbitMQ提供的RPC机制,导 致不断创建、销毁reply queue ,进而造成disk node的性能问题(因为会 针对元数据不断写盘)。所以在使用RPC机制时需要考虑自身的业务场 景。
12、 向不存在的exchange发publish消息会发生什么?向不存在的 queue执行consume动作会发生什么?
都会收到Cha nn el.Close信令告之不存在(内含原因404 NOT_FOUND)。
13、 routing_key和binding_key的最大长度是多少?
255字节。
14、 RabbitMQ允许发送的message最大可达多大?
根据AMQP协议规定,消息体的大小由64-bit的值来指定,所以你就可 以知道到底能发多大的数据了。
15、 什么情况下producer不主动创建queue是安全的?
〔.message是允许丢失的;
2•实现了针对未处理消息的republish功能(例如采用Publisher Confirm机 制)。
16、 “dead letter”queue 的用途?
当消息被RabbitMQ server投递到con sumer后,但con sumer却通过 Basic.Reject进行了拒绝时(同时设置requeue=false),那么该消息会被放 入“dead letter"queue 中。该 queue 可用于排查 message 被 reject 或 un deliver 的原因。
17、 什么说保证message被可靠持久化的条件是queue和exchange 具有durable属性,同时message具有persistent属性才行? binding关系可以表示为excha nge - binding - queue。从文档中我们知 道,若要求投递的message能够不丢失,要求message本身设置 persiste nt属性,要求excha nge和queue者牧设置durable属性。其实这问 题可以这么想,若excha nge或queue未设置durable属性,则在其 crash之后就会无法恢复,那么即使message设置了 persiste nt属性,仍 然存在message虽然能恢复但却无处容身的问题;同理,若message本身 未设置persiste nt属性,则message的持久化更无从谈起。
18、 什么情况下会出现blackholed问题?
blackholed问题是指,向excha nge投递了 message,而由于各种原因导 致该message丢失,但发送者却不知道。可导致blackholed的情况:
1•向未绑定queue白勺excha nge发送 message;
2.excha nge 以 binding key key A 绑定了 queue queue A ,但向该
excha nge发送 message使用白勺 routing key 却是 key B o

19、 如何防止出现blackholed问题?
没有特别好的办法,只能在具体实践中通过各种方式保证相关fabric的存 在。另外,如果在执行Basic.Publish时设置man datory=true,则在遇到 可能出现blackholed情况时,服务器会通过返回Basic.Return告之当前 message无法被正确投递(内含原因312 NO_ROUTE)。
20、 Consumer Cancellation Notification 机制用于什么场景?
用于保证当镜像queue中master挂掉时,连接到slave上的con sumer可 以收到自身con sume被取消的通知,进而可以重新执行con sume动作从 新选出的master出获得消息。若不采用该机制,连接到slave上的 con sumer将不会感知master挂掉这个事情,导致后续无法再收到新 master广播出来的message。另外,因为在镜像queue模式下,存在将 message进行requeue的可能,所以实现con sumer的逻辑时需要能够正 确处理出现重复message的情况。
21、 Basic.Reject的用法是什么?
该信令可用于con sumer对收到的message进行reject。若在该信令中设 置requeue=true,则当RabbitMQ server收到该拒绝信令后,会将该 message重新发送到下一个处于con sume状态的con sumer处(理论上仍 可能将该消息发送给当前con sumer)。若设置requeue=false,则 RabbitMQ server在收到拒绝信令后,将直接将该message从queue中移 除。
另外一种移除queue中message的小技巧是,consumer回复Basic.Ack 但不对获取到的message做任何处理。
而Basic.Nack是对Basic.Reject的扩展,以支持一次拒绝多条message 的能力。
22、 为什么不应该对所有的message都使用持久化机制?
首先,必然导致性能的下降,因为写磁盘比写RAM慢的多,message的 吞吐量可能有10倍的差距。其次,message的持久化机制用在 RabbitMQ的内置cluster方案时会出现"坑爹”问题。矛盾点在于,若 message设置了 persiste nt属性,但 queue未设置 durable属性,那么当 该queue的own er node出现异常后,在未重建该queue前,发往该 queue 白勺 message 将被 blackholed ;若 message 设置了 persiste nt 属性, 同时queue也设置了 durable属性,那么当queue的own er node异常且 无法重启的情况下,则该queue无法在其他node上重建,只能等待其 owner node重启后,才能恢复该queue的使用,而在这段时间内发送给 该queue的message将被blackholed。所 以,是否要对message进行 持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000条/秒以上的消息吞吐量(单RabbitMQ服务器),则要么使用其他 的方式来确保message的可靠delivery,要么使用非常快速的存储系统以 支持全持久化(例如使用SSD)。另外一种处理原则是:仅对关键消息作持久 化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶 颈。
23、RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分 别用于解决什么问题?存在哪些问题?
cluster是为了解决当cluster中的任意node失效后,producer和
con sumer均可以通过其他node继续工作,即提高了可用性;另外可以通 过增加node数量增加cluster的消息吞吐量的目的。cluster本身不负责 message的可靠性问题(该问题由producer通过各种机制自行解 决);cluster无法解决跨数据中心的问题(即脑裂问题)。
另外,在cluster前使用HAProxy可以解决node的选择问题,即业务无 需知道cluster中多个node的ip地址。可以利用HAProxy进行失效node 的探测,可以作负载均衡。
Mirrored queue是为了解决使用cluster时所创建的queue的完整信息仅 存在于单一node上的问题,从另一个角度增加可用性。若想正确使用该 功能,需要保证:

consumer 需要支持 Consumer Cancellation Notification 机制;
co nsumer必须能够正确处理重复message。
Warre ns是为了解决cluster中message可能被blackholed的问题,即不 能接受 producer 不停 republish message 但 RabbitMQ server 无回应的情 况。Warrens有两种构成方式:
—种模型是两台独立的RabbitMQ server + HAProxy,其中两个server的 状态分别为active和hot-sta ndby。该模型的特点为:两台server之间无任 何数据共享和协议交互,两台server可以基于不同的RabbitMQ版本。
另一种模型为两台共享存储的RabbitMQ server + keepalived,其中两个 server 的状态分别为 active 和 cold-standby。
该模型的特点为:两台server基于共享存储可以做到完全恢复,要求必须 基于完全相同的RabbitMQ版本。
Warre ns模型存在的问题:
对于第一种模型,虽然理论上讲不会丢失消息,但若在该模型上使用持久 化机制,就会出现这样一种情况,即若作为active的server异常后,持久 化在该server上的消息将暂时无法被con sume,因为此时该queue将无 法在作为hot- sta ndby的server上被重建,所以,只能等到异常的active server恢复后,才能从其上的queue中获取相应的message进行处理。 而对于业务来说,需要具有:a.感知AMQP连接断开后重建各种fabric的能 力;b.感知active server恢复的能力;c.切换回active server的时机控制,以 及切回后,针对message先后顺序产生的变化进行处理的能力。
对于第二种模型,因为是基于共享存储的模式,所以导致active server异 常的条件,可能同样会导致cold-sta ndby server异常;另外,在该模型下, 要求active和cold-standby的server必须具有相同的node名和UID,否 则将产生访问权限问题;最后,由于该模型是冷备方案,故无法保证cold- sta ndby server能在你要求的时限内成功启动。
消息什么情况下会丢失?
配合mandatory参数或备份交换器来提高程序的健壮性

发送消息的交换器并没有绑定任何队列,消息将会丢失
交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配

预估队列的使用情况?
在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。

消费消息?
推模式,拉模式

保证消息的可靠性?
RabbitMQ 提供了消息确认机制( message acknowledgement)。 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上 是先打上删除标记,之后再删除)。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的 消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

在ack为false的情况下,消费者获取消息迟迟没有发送消费者确认消息的信号或者消费者断开,怎么办?
当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分: 一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可 能还是原来的那个消费者。RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者 消费一条消息的时间可以很久很久。

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?
RabbitMQ 在 2.0.0 版本开始引入了 Basic .Reject 这个命令,消费者客户端可以调用与其对 应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

//Channel 类中的 basicReject 方法定义如下: //其中 deliveryTag 可以看作消息的编号 ,它是一个 64 位的长整型值,最大值是 9223372036854775807。如果 //requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入 队列,以便可以发送给下一个订阅的消费者;如果 //requeue 参数设置为 false,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。 void basicReject(long deliveryTag, boolean requeue) throws IOException

注意:

Basic.Reject 命令一次只能拒绝一条消息 ,如果想要批量拒绝消息 ,则可以使用 Basic.Nack 这个命令

//消费者客户端可以调用 channel.basicNack 方法来实现,方法定 义如下: //其中 deliveryTag 和 requeue 的含义可以参考 basicReject 方法。 multiple 参数 //设置为 false 则表示拒绝编号为 deliveryT坷的这一条消息,这时候 basicNack 和 basicReject 方法一样; //multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所 有未被当前消费者确认的消息。 void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException

注意:

将 channel.basicReject 或者 channel.basicNack 中的 requeue 设直为 false,可以启用”死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题

请求RabbitMQ重新发送还未被确认的消息?
//Basic.Recover 具备可重入队列的特性 Basic.RecoverOk basicRecover() throws IOException; Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息。 如果 requeue 参数设置为 true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息 来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false,那么同一条消 息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于

channel.basicRecover(true) ,即 requeue 默认为 true

交换器无法根据自身的类型和路由键找到一个符合条件 的队列

当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参 数设置为 false 时,出现上述情形,则消息直接被丢弃

生产者如何获取到没有被正确路由到合适队列的消息呢?
可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。RabbitMQ 会通过 Basic . Return 返回 “mandatory test” 这条消息,之后生产者客户端通过 ReturnListener 监昕到了这个事 件,上面代码的最后输出应该是”Basic.Retum 返回的结果是: mandatory test”

mandatory和immediate参数的区别
mandatory 参数告诉服务器至少将该消息路由到一个队列中, 否则将消息返 回给生产者。 immediate 参数告诉服务器, 如果该消息关联的队列上有消费者, 则立刻投递: 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者, 不用将消息存入队列而等 待消费者了。

未被路由到的消息应该怎么处理?
发送消息的时候设置mandatory参数,添加ReturnListener监听器接收未被路由到的返回消息

采用备份交换器AE,可以将未被路由的消息存储在RabbitMQ中,通过声明交换器的时候添加AE参数实现,或者通过策略的方式实现,同时使用,前者优先级高,会覆盖掉Policy的设置

备份交换器需要注意?
如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失

如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失

如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失

如果备份交换器和mandatory参数一起使用,那么mandatory参数无效

怎么为消息设置过期时间TTL?
通过队列属性设置,队列中所有消息都有相同的过期时间,声明队列的时候在channel.queueDeclare加入TTL参数

对消息本身进行单独设置,每条消息的TTL可以不同,在channel.basicPublish方法参数中设置

同时使用以上两种方式设置过期时间,以较小的为准

消息在队列中的生存时间一旦超过设置的TTL值,就变成死信,消费者无法再收到该消息(不是绝对的)

如果不设置 TTL.则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 参数

对过期消息处理?
设置队列 TTL 属性的方法,一旦消息过期,就会从队列中抹去,队列中己过期的消息肯定在队 列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可,

消息本身进行单独设置,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将 被消费时再判定是否过期, 如果过期再进行删除即可。

怎么设置队列的过期时间?
通过 channel . queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处 于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic . Get 命令。

RabbitMQ 会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时 。在 RabbitMQ 重启后,持久化的队列的过期时间会被重新计算。

什么是死信队列?
DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定, 实 际上就是设置某个队列的属性。当这个队列中存在死信时 , RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

什么是延迟队列?
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

延迟队列应用场景?
订单系统,用延迟队列处理超时订单
用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将 用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

持久化?
交换器的持久化

交换器的持久化是通过在声明交换器时将 durable 参数置为 true 实现的,如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失, 不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。

队列的持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的,如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

消息的持久化

通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化。

在这段时间内 RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将RabbitMQ 实战指南会丢失。这个问题怎么解决呢?

可以引入 RabbitMQ 的镜像队列机制,相当于配置了副本,如果主节点 Cmaster) 在此特殊时间内挂掉,可以自动切换到从节点 Cslave ), 这样有效地保证了高可用性

当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?
通过事务机制实现,比较消耗性能

客户端发送 Tx.Select. 将信道置为事务模式;

Broker 回复 Tx. Select-Ok. 确认己将信道置为事务模式:

在发送完消息之后,客户端发送 Tx.Commit 提交事务;

Broker 回复 Tx. Commi t-Ok. 确认事务提交。

通过发送方确认机制实现

消费端对消息的处理?
过推模式或者拉模式的方 式来获取井消费消息,当消费者处理完业务逻辑需要手动确认消息己被接收,这RabbitMQ才能把当前消息从队列中标记清除

如果消费者由于某些原因无法处理当前接收到的消息, 可以通过 channel . basicNack 或者 channel . basicReject 来拒绝掉。

消费端存在的问题?
消息分发

同一个队列拥有多个消费者,会采用轮询的方式分发消息给消费者,若其中有的消费者任务重,有的消费者很快处理完消息,导致进程空闲,这样对导致整体应用吞吐量下降,为了解决上面的问题,用到channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。Basic.Qos 的使用对于拉模式的消费方式无效.

举例如下:

在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订 阅了某个队列进行消费。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。 直到消费者确认了某条消息之后 , RabbitMQ将相应的计数减1,之后消费者可以继续接收消息, 直到再次到达计数上限。这种机制可以类比于 TCP!IP中的”滑动窗口”。

消息顺序性
生产者使用了事务机制可能会破坏消息顺序性

生产者发送消息设置了不同的超时时间,并且设置了死信队列

消息设置了优先级

可以考虑在消息体内添加全局有序标识来实现

弃用QueueingConsumer,Spring提供的RabbitMQ采用的是DefaultConsume

内存溢出,由于某些原因,队列之中堆积了比较多的消息,可能导致消费者客户端内存溢出假死,发生恶性循环,使用 Basic . Qos 来解决,一定要在调用 Basic . Consume 之前调用 Basic.Qos

才能生效。

会拖累同一个connection下的所有信道,使其性能降低

同步递归调用QueueingConsumer会产生死锁

RabbitMQ的自动连接恢复机制不支持QueueingConsumer这种形式

QueueingConsumer不是事件驱动的

消息传输保障?
一般消息中间件的消息传输保障分为三个等级

At most once: 最多一次。消息可能会丢失,但绝不会重复传输。

At least once: 最少一次。消息绝不会丢失,但可能会重复传输。

Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。

RabbitMQ支持其中的“最多一次”和“最少一次”。

其中”最少一次”投递实现需要考虑 以下这个几个方面的内容:

消息生产者需要开启事务机制或者 publisher confirm 机制,以确保消息可以可靠地传 输到 RabbitMQ 中。

消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器 路由到队列中,进而能够保存下来而不会被丢弃。

消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。

消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去 确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。

“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这 样很难确保消息不会丢失。

提高数据可靠性途径?
设置 mandatory 参数或者备份交换器 (immediate 参数己被陶汰);

设置 publisher conflITll机制或者事务;

设置交换器、队列和消息都为持久化;

设置消费端对应的 autoAck 参数为 false 井在消费完消息之后再进行消息确认

RabbitMQ 幂等性概念及业界主流解决方案
一、什么是幂等性
可以参考数据库乐观锁机制,比如执行一条更新库存的 SQL 语句,在并发场景,为了性能和数据可靠性,会在更新时加上查询时的版本,并且更新这个版本信息。可能你要对一个事情进行操作,这个操作可能会执行成百上千次,但是操作结果都是相同的,这就是幂等性。

二、消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。

业界主流的幂等性有两种操作:

1.唯一 ID + 指纹码 机制,利用数据库主键去重

2.利用redis的原子性去实现

三、唯一 ID + 指纹码 机制
大家肯定懂唯一 ID 的,就不多说了,为什么需要指纹码呢?这是为了应对用户在一瞬间的频繁操作,这个指纹码可能是我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中。

好处就是实现简单,就一个拼接,然后查询判断是否重复。

坏处就是在高并发时,如果是单个数据库就会有写入性能瓶颈

解决方案 :根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。

四、利用 redis 的原子性去实现
相信大家都知道 redis 的原子性操作,我这里就不需要过多介绍了。

使用 redis 的原子性去实现需要考虑两个点

一是 是否 要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性? 数据库与缓存进行同步肯定要进行写操作,到底先写 redis 还是先写数据库,这是个问题,涉及到缓存更新与淘汰的问题

二是如果不落库,那么都存储到缓存中,如何设置定时同步的策略? 不入库的话,可以使用双重缓存等策略,保障一个消息副本,具体同步可以使用类似 databus 这种同步工具。

五、怎么保证消息队列消费的幂等性?
先大概说一说可能会有哪些重复消费的问题。

首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

六、其实还是得结合业务来思考,我这里给几个思路:
(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧

(2)比如你是写redis,那没问题了,反正每次都是set,天然幂等性

(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

如何保证MQ的消费是幂等性的,需要结合具体的业务来看

rabbitmq保证消息可靠 消息不丢失
三端可靠
发送方和mq保证消息送达到mq
mq保证保存的消息不丢失
消费方和mq一起保证消息被成功消费
发送方和mq保证消息送达到mq
方案一、rabbitmq如果是用spring boot提供的模版接口发送 需要调用rabbitTemplate.convertSendAndReceive()方法发送 这个是当消息成功到队列了才会返回结果 如果失败则会抛异常 不过这就会导致等待时间比较长 适合高可靠场景
不过一般在业务开发都是完成业务以后再发消息 比如插入订单表一笔订单 发送订单创建的消息 这两步是需要保证原子性的 要么都成功要么都失败 rabbitmq支持事务消息 不过如果出现下面情况

开启事务
插入订单表
发送mq消息
提交数据库事务成功
提交mq事务失败
消息丢失
所以如果是用rabbitmq的事务消息来做 其实在极端情况是会丢失消息的 在这里可以采用一个异步命令组件提供的方案https://github.com/bojiw/asyncmd

开启事务
插入订单表
插入异步命令表
提交数据库事务
线程扫描异步命令表捞取消息
通过rabbitTemplate.convertSendAndReceive()方法发送
如果失败 则重试 并且报警
方案二、如采用rabbitTemplate.convertAndSend和confirms(消费回调)加Return(错误回调)模式

convertAndSend 发送到mq 立刻返回 不管交换机是否成功处理 所以并发会高
confirms(消费回调) 实现接口ConfirmCallback 消息成功发送到rabbitmq交换机上则会回调接口 入参ack为true代表成功发送到交换机 false代表异常
Return(错误回调) 实现接口ReturnCallback 消息从交换机到队列 成功不会回调 如果发送到队列失败 则会调用回调
上面这种方式如果在回调中处理消息发送失败的逻辑时出现异常或者应用服务器挂了 则会导致消息丢失 因为只会回调一次
这种情况可以采用加一张消息表 先插入消息表 然后扫表发送消息 confirms回调成功 则更新表状态 如果回调的时候异常 则消息表会重新发送 这种就会出现消息重发的情况 不过一般消息消费者都要保证幂等 所以这个问题不大 不过如果出现以下情况

数据库有两个字段 confirms默认0 和 return 默认0
回调成功confirms=1 回调失败confirms=2 错误回调return=2
当回调成功 confirms=1 错误回调处理失败没有成功更新表 则return还是0
这个时候你扫表就不确定需不需要重发消息 因为如果消息成功到队列 表的状态也是confirms=1 return=0
无法对发送队列成功和发送队列失败可在回调异常这两种情况做区分
这里逻辑就会出问题 所以只能处理消息成功到交换机 是否到队列则不管 因为一般都是成功的 除了极端情况 比如队列被人误删除
方案二和方案一其实从整个流程来讲 发送消息速度其实差不多的 不过可靠性还是方案一高一点

mq保证保存的消息不丢失
消息、交换机、队列都需要设置持久化

消费方和mq一起保证消息被成功消费
消费者开启手动确认

acknowledge=“manual”
1
在业务代码里 成功处理业务 才返回给rabbitmq消费成功的确认

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
1
如果业务处理失败则重新放到队列重新消费

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
1
由消费者 只有业务成功处理才进行ack 记得需要做好幂等
不过这里rabbitmq在重试这块没有做好 如果不确定会一直重试 如果因为依赖的一个系统挂了 要一个小时以后才会启动成功 在这一个小时里会一直重试 这就会对rabbitmq和消费者带来一定的压力 这块也可以采用异步命令组件提供的方案https://github.com/bojiw/asyncmd

接收消息
把消息插入异步命令
返回rabbitmq成功
异步组件执行业务逻辑
调用接口失败重试
重试一定次数则不重试 由人工进行处理 也可以把重试间隔设置的长一点 比如前三次每隔1s重试 第四次隔一个小时重试




版权声明:本文为CSDN博主「Damon_zqt」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Damon_zqt/article/details/105948616

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号