当前位置:   article > 正文

RabbitMQ 基础概念_rabbitmq基础概念

rabbitmq基础概念

博文目录


MQ

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

MQ的优势

  1. 应用解耦: 提高系统容错性和可维护性。系统的耦合性越高,容错性就越低,可维护性就越低。
  2. 异步提速: 提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
  3. 削峰填谷: 提高系统稳定性。使用了 MQ 之后,限制消费消息的最高速度,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

MQ的劣势

  1. 系统可用性降低: 系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
  2. 系统复杂度提高: MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不被丢失等情况?

常见的MQ产品

在这里插入图片描述
ActiveMQ已经很老了, 现在几乎不再使用了, Kafka更多的是用在大数据领域, RabbitMQ因为更注重安全方面, 所以在性能上有一定的损失, 但是金融公司一般都是使用的它

RabbitMQ

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
在这里插入图片描述
在这里插入图片描述

AMQP

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
在这里插入图片描述

Broker

接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host

出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection

publisher/consumer 和 broker 之间的 TCP 连接

Channel

如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange

message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key(就是队列名),分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue

消息最终被送到这里等待 consumer 取走

Binding

exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

工作模式

HelloWorld 简单模式、WorkQueues 工作队列模式、PublishSubscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;先不看)
在这里插入图片描述

重要说明

5种模式, 这些模式其实并不是互不相干的

  1. HelloWorld是最基础的, 就是一个Queue与一个Consumer的关系
  2. WorkQueues也是很基础的, 就是一个Queue与多个Consume的关系, 非常重要的是, 多个Consumer之间是竞争关系, HelloWorld和WorkQueues不需要显式的绑定Exchange
  3. PublishSubscribe是一个Exchange与一个或多个Queue的关系, Queue之间不是竞争的关系, 每个Queue都能拿到全部的消息. 需显式的绑定Exchange(这里的Exchange为Fanout广播模式), 而不用指定RoutingKey(但需指定为空字符串?). 而每个Queue根据连接的Consumer的数量又能组成HelloWorld或WorkQueues的关系
  4. Routing相当于是在PublishSubscribe的基础上, 又对RoutingKey做了一层匹配, Queue需要显式的绑定Exchange(这里的Exchange为Direct定向模式), 同时还得指定RoutingKey, 还需要发布者发布消息时也指定RoutingKey, 当Exchange,Queue,RoutingKey都匹配的时候, Consumer才能收到消息, Queue与Consumer的关系同样可以是HelloWorld和WorkQueues
  5. Topics相当于是在Routing的基础上, 扩展了RoutingKey的匹配规则(这里的Exchange为Topic主题模式), RoutingKey是用"."来做分割符的, "*"能匹配一段, "#"能匹配任意断, Queue与Consumer的关系同样可以是HelloWorld和WorkQueues
总结
  1. 简单模式 HelloWorld
    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
  2. 工作队列模式 WorkQueues
    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
  3. 发布订阅模式 PublishSubscribe
    需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
  4. 路由模式 Routing
    需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
  5. 通配符模式 Topics
    需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

HelloWorld

在这里插入图片描述
P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

WorkQueues

在这里插入图片描述
与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

使用场景

对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

PublishSubscribe

在这里插入图片描述
在发布订阅模型中,多了一个 Exchange 角色,而且过程略有变化

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

使用场景

发布订阅模式因为所有消费者获得相同的消息, 所以特别适合 数据提供商与应用商

小结

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:

  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

Routing

在这里插入图片描述
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey

Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

小结

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列

Topics

在这里插入图片描述
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
在这里插入图片描述
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

小结

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活

监听机制

分两种, 分别是Producer到Broker的Confirm监听和Exchange到Queue的Return监听

特别注意: 这两种状态只代表生产者与Broker之间消息投递的情况, 与消费者是否接收/确认消息无关

Confirm

Producer将消息发送到Broker时产生的状态, 后续会出现两种情况

  • ack: Broker已经将数据接收
  • nack: Broker拒收消息, 可能是队列已满, 限流, IO异常等等

Return

Return代表消息被Broker正常接收(ack)后, 但Broker没有对应的队列进行投递时产生的状态, 消息被退回给生产者

高级特性

消息可靠投递

生产者 confirm return

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为, producer —> rabbitmq broker —> exchange —> queue —> consumer

消息从 producer 到 exchange 则触发 ConfirmCallback, 消息从 exchange 到 queue 投递失败则会触发 ReturnCallback, 利用这两个 callback 控制消息的可靠性投递

小结

设置ConnectionFactory的publisher-confirms=“true” (新版本:confirm-type=“CORRELATED”)开启 确认模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理

设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给producer。并执行回调函数returnedMessage

消费者 ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式

  1. 自动确认:acknowledge=“none”
  2. 手动确认:acknowledge=“manual”
  3. 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。也可以丢弃该消息

如果是手动签收模式, 但是不签收, 即不ack也不nack, 则消息还会存在队列中, 只不过会从ready状态转成unacked状态, 而不会被队列清除
在channel没有断开之前, 所有不ack也不nack的消息, 都可以在某一时刻通过批量签收而被队列清除
如果channel断开之前都没有签收消息, 则在断开时, 消息会从unacked状态重新转成ready状态, 等待下个连接消费

小结

在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认

如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息

如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息

消息的可靠投递总结

  1. 持久化, exchange要持久化, queue要持久化, message要持久化
  2. 生产方确认Confirm
  3. 消费方确认Ack
  4. Broker高可用

消费端限流

  1. 在< rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息
  2. 消费端的确认模式一定为手动确认。acknowledge=“manual”

ttl

TTL 全称 Time To Live(存活时间/过期时间), 当消息到达存活时间后,还没有被消费,会被自动清除

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

小结

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期

如果两者都进行了设置,以时间短的为准

死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX
在这里插入图片描述

消息成为死信的三种情况

  1. 队列消息长度到达限制
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  3. 原队列存在消息过期设置,消息到达超时时间未被消费

队列绑定死信交换机

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

小结

死信交换机和死信队列和普通的没有区别

当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

延时队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果
在这里插入图片描述

消息积压

原因

  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发流量太大

解决方案
上线更多的消费者(工作队列方式), 上线专门的队列消费服务, 将消息先批量取出来,记录数据库,再慢慢处理

消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果

每个消息用一个唯一标识来区分,消费前先判断标识有没有被消费过,若已消费过,则直接ACK

一些面试题

1、保证消息不丢失(三步)
1.1、开启事务(不推荐)
1.2、开启confirm(推荐)
1.3、开启RabbitMQ持久化(交换机、队列、消息)
1.4、关闭RabbitMQ自动ack(改成手动)

2、保证消息不重复消费
2.1、幂等性(每个消息用一个唯一标识来区分,消费前先判断标识有没有被消费过,若已消费过,则直接ACK)

3、RabbitMQ如何保证消息的顺序性
将消息放入同一个交换机,交给同一个队列,这个队列只有一个消费者,消费者只允许同时开启一个线程

4、RabbitMQ消息重试机制
消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?
答案:使用消息重试机制(SpringBoot默认3次消息重试机制)

如何合适选择重试机制
消费者取到消息后,调用第三方接口,接口无法访问,需要使用重试机制
消费者取到消息后,抛出数据转换异常,不需要重试机制,需要发布者进行解决。

5、SpringBoot消息重试机制
@EnableRetry注解:表示启用重试机制(value表示哪些异常需要触发重试,maxAttempts设置最大重试次数,delay表示重试的延迟时间,multiplier表示上一次延时时间是这一次的倍数)
eg、@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 1.5))

@Recover注解:当重试次数达到设置的最大次数的时候,程序还是执行异常,调用的回调函数。

6、RabbitMQ死信队列
死信队列是当消息在一个队列因为下列原因:
a、消息被拒绝(basic.reject或basic.nack)并且requeue=false.
b、消息TTL过期
c、队列达到最大长度(队列满了,数据无法添加到mq中)
变成了 “死信队列” 后被重新投递(publish)到另一个Exchange,然后重新消费。说白了就是没有被消费的消息换个地方重新被消费

7、RabbitMQ解决分布式事务

经典案例,以目前流行的外卖为例,用户下单后,调用订单服务,订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯。

RabbitMQ解决分布式事务原理
答案:采用最终一致性原理

需要保证以下三要素:
a、确保生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
b、确保消费者能够正确消费消息,采用手动ACK模式(注意重试、幂等性问题)
c、如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)

8、RabbitMQ保证消息不丢失的具体方案
前提:
(1)开启confirm
(2)开启RabbitMQ的持久化(交换机、队列、消息)
(3)关闭RabbitMQ的自动ack(改成手动)
(4)配置消费重试次数,消费重试间隔时间等

涉及到的技术点:
MQ、Redis、定时任务

8.1、保证投放消息不丢失
(1)先将消息放入生产者Redis(此时消息的状态为未投放),再放入队列
(2)根据confirm(ReturnCallback和ConfirmCallback)的结果来确定消息是否投递成功,
投递成功的,修改生产者redis中消息的投递状态为已投递
投递失败的消息将会放入失败的Redis,并从生产者Redis中删除,由定时任务定期扫描并重新投递

(3)生产者Redis定时任务
生产者Redis定时任务专门扫描生产者Redis中存放了一定时间,但是状态还是未投放的消息
此消息会被认为已经投递,但是没有任何反馈结果(由于不可知因素,导致没有ReturnCallback,也没有ConfirmCallback),
此类消息被扫描到后,会放入失败的Redis,并从生产者Redis中删除,由定时任务定期扫描并重新投递
(4)还需要一个专门的定时任务扫描生产者Redis中存放了很久,仍然未消费的数据(状态为已投递),此类消息被扫描到后,会放入失败的Redis,并从生产者Redis中删除,由定时任务定期扫描并重新投递
(5)扫描失败的Redis的定时任务都遵循一条原则,一条消息最多被重新投递三次,若投递了三次仍然失败,则记录日志,记录到数据库,不会再投递,需要人工干预处理

8.2、保证消费消息不丢失
(1)消费者取到消息后,从消息中取出唯一标识,先判断此消息有没有被消费过,若已消费过,则直接ACK(避免重复消费)
(2)正常处理成功后,将生产者Redis中的此消息删除,并ACK(告诉server端此消息已成功消费)
(3)遇到异常时,捕获异常,验证自己在消息中设定的重试次数是否超过阀值,若超过,则放入死信队列,若未超过,则向将消息中的重试次数加1,抛出自定义异常,进入重试机制
(4)有专门的消费者用于处理死信队列中消费多次仍未消费成功的数据,可以记录日志,入库,人工干预处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/694353
推荐阅读
相关标签
  

闽ICP备14008679号