赞
踩
消息中间件是在消息的传输过程中保存消息的容器
,消息中间件再将消息从它的源中继到它的目标时充当中间人
的作用。 队列
的主要目的是提供路由并保证消息的传递
,如果发送消息时接收者不可用,消息队列会保留消息
,直到可以成功的传递它为止,当然,消息队列保存消息也是有限期
的。
特点:
1、每个消息只有一个消费者。
2、发送者和接收者没有时间依赖。是直接消费的。
3、接收者确认消息接受和处理成功。
点对点模式常用于邮件服务、短信服务。
特点:
1、每个消息可以有多个订阅者。
2、客户端只有订阅后才能接收到消息。
3、持久订阅和非持久订阅。
持久订阅:生产者和消费者建立关系之后,消息就不会消失,无论消费者是否在线。
非持久订阅:消费者只有一直在线,才能接收消息,只有一个消费者时约等于点对点模型。
RabbitMQ是实现AMQP消息队列和路由功能的进程,内部结构为多个Virtual Host虚拟机,虚拟机中有Exchange和Queue两个组件。 Exchange用于接收生产者发送的消息,Queue用于暂存指定服务器的还未被消费的消息,以备消费者订阅后,提供使用。
RabbitMQ遵循AMQP协议,自身采用Erlang编写。
当Exchange接收到生产者发送的消息,会根据Binding路由规则将消息路由给服务器中的队列。BindingKey将特定的Exchange和特定的队列Queue绑定起来,ExchangeType决定了Exchange路由消息的行为。ExchangeType有direct、Fabout、Topic三种,不同类型ExchangeType行为是不一样的。
AMQP (Advanced Message Queuing Protocol) 是一个提供统一消息服务的应用层通讯协议,为消息中间件提供统一的开发规范。**不同客户端可以将消息投递到中间件上,或从上面获取消息;发送消息和接收消息的客户端可以采用不同的语言开发、不同的技术实现,但必须遵循相同的 AMQP 协议。**AMQP 协议本身包括以下三层:
RabbitMQ 与 AMQP 遵循相同的模型架构。
发布者 (或称为生产者) 负责生产消息并将其投递到指定的交换器上。
消息由消息头和消息体组成。消息头用于存储与消息相关的元数据:如目标交换器的名字 (exchange_name) 、路由键 (RountingKey) 和其他可选配置 (properties) 信息。消息体为实际需要传递的数据。
交换器负责接收来自生产者的消息,并将将消息路由到一个或者多个队列中,如果路由不到,则返回给生产者或者直接丢弃,这取决于交换器的 mandatory 属性:
1)Fanout Exchange: 广播式
将队列绑定到交换机上,一个发送到该交换机上的消息都会转发到所有消息队列上。相当于子网广播。Fanout转发消息是最快的。
2)Direct Exchange: 直接交互式
会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。
3)Topic Exchange:主题式
topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队列中。
topic的匹配规则跟direct匹配规则不一样,topic约定:BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符“”和“#”,用于模糊匹配,其中"“用于匹配一个单词,”#"用于匹配多个单词(可以是0个)。
4)headers Exchange
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,基本上不推荐使用。
交换器与队列通过 BindingKey 建立绑定关系。exchange 和queue 之间的虚拟连接,binding 中可以包含routingkey。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
生产者将消息发给交换器的时候,一般会指定一个 RountingKey,用来指定这个消息的路由规则。当 RountingKey 与 BindingKey 基于交换器类型的规则相匹配时,消息被路由到对应的队列中。
用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询 (round-robin) 的方式分发给所有消费者。即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。
消费者订阅感兴趣的队列,并负责消费存储在队列中的消息。为了保证消息能够从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制 (message acknowledgement),并通过 autoAck 参数来进行控制:
用于传递消息的 TCP 连接。
RabbitMQ 采用类似 NIO (非阻塞式 IO ) 的设计,通过 Channel 来复用 TCP 连接,并确保每个 Channel 的隔离性,就像是拥有独立的 Connection 连接。
Channel 是在 Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel 之间是完全隔离的。
**RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离,一个虚拟主机就是一个小型的 RabbitMQ 服务器,拥有独立的队列、交换器和绑定关系。**用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,无法将 vhost1 上的交换器与 vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全。默认的虚拟主机名为 /
。
这种隔离性设计是出于多租户和安全因素设计的,把AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange/queue 等。
接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
RabbitMQ 支持多种交换器类型,常用的有以下四种:fanout、direct、topic、headers。
这是最简单的一种交换器模型,此时会把消息路由到与该交换器绑定的所有队列中。任何发送到 X 交换器上的消息,都会被路由到属于 X 交换器上的 Q1 和 Q2 两个队列上。
任何发送到 X 交换器上的消息,把消息路由到 BindingKey 和 RountingKey 完全一样的队列中。当消息的 RountingKey 为 orange 时,消息会被路由到 Q1 队列;当消息的 RountingKey 为 black 或 green 时,消息会被路由到 Q2 队列。
需要特别说明的是一个交换器绑定多个队列时,它们的 BindingKey 是可以相同的,例如:此时当消息的 RountingKey 为 black 时,消息会同时被路由到 Q1 和 Q2 队列。
将消息路由到 BindingKey 和 RountingKey 相匹配的队列中,匹配规则如下:
.
进行连接;#
和 *
。其中 *
用于匹配一个单词, #
用于匹配零个或者多个单词。例如:*.orange、orange.#。
路由键说明:
- 路由键为
lazy.orange.elephant
的消息会发送给所有队列;- 路由键为
quick.orange.fox
的消息只会发送给 Q1 队列;- 路由键为
lazy.brown.fox
的消息只会发送给 Q2 队列;- 路由键为
lazy.pink.rabbit
的消息只会发送给 Q2 队列;- 路由键为
quick.brown.fox
的消息与任何绑定都不匹配;- 路由键为
orange
或quick.orange.male.rabbit
的消息也与任何绑定都不匹配。
**在交换器与队列进行绑定时可以指定一组键值对作为 BindingKey;在发送消息的 headers 中的可以指定一组键值对属性,当这些属性与 BindingKey 相匹配时,则将消息路由到该队列。**同时还可以使用 x-match
参数指定匹配模式:
headers 类型的交换器性能比较差,因此其在实际开发中使用得比较少。
**RabbitMQ 中另外一个比较常见的概念是死信队列。当消息在一个队列中变成死信 (dead message) 之后,它可以被重新被发送到死信交换器上 (英文为 Dead-Letter-Exchange,简称 DLX ),任何绑定死信交换器的队列都称之为死信队列。**需要特别说明的是死信交换器和死信队列与正常的交换器和队列完全一样,采用同样的方式进行创建,它们的名称表达的是其功能,而不是其类型。
一个正常的消息变成死信一般是由于以下三个原因:
- 消息被拒绝 (Basic.Reject/Basic.Nack) ,井且设置重回队列的参数 requeue 为 false;
- 消息过期;
- 队列达到最大长度。
**可以在队列创建的 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为正常队列添加死信交换器,当该队列中存在死信时,死信就会被发送到死信交换器上,进而路由到死信队列上。**示例如下:
// 创建死信交换器
channel.exchangeDeclare("exchange.dlx", "direct");
// 声明死信队列
channel.queueDeclare(" queue.d1x ", true, false, false, null);
// 绑定死信交换器和死信队列
channel.queueBind("queue.dlx ", "exchange.dlx ", "routingkey");
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "exchange.dlx");
// 为名为 myqueue 的正常队列指定死信交换器
channel.queueDeclare("queue.normal", false, false, false, args);
//除此之外,还可以重新指定死信的路由键,如果没有指定,则默认使用原有的路由键,重新设置的方法如下:
args.put("x-dead-letter-routing-key", "some-routing-key");
默认情况下RabbitMQ的队列和交换机在RabbitMQ服务器重启之后会消失,原因在于队列和交换机的durable属性,该属性默认情况下为false。能从AMQP服务器崩溃中恢复的消息称为持久化消息,如果想要从崩溃中恢复那么消息必须
消息写入磁盘性能差很多,没有采用缓冲区刷盘机制,所以除非特别关键的消息会使用。
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。
如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
消费者需要监听不可到达消息,并设置默认返回监听。也可以把不可到达的小鞋添加到死信队列,用于后续追踪。
在高并发的时候,瞬间产生的流量很大,消息很大,而MQ有个重要的作用就是限流,限流则是消费端做的。RabbitMQ提供了一种Qos(服务质量保证)功能,即在非自动确认消息的前提下,在一定数量的消息未被消费前,不进行消费新的消息。
注意: autoAck设置为false, 一定要手工签收消息
// prefetchSize消息的限制大小,一般设置为0,在生产端限制
// prefetchCount 我们一次最多消费多少条消息,一般设置为1
// global,一般设置为false,在消费端进行限制
channel.basicQos(int prefetchSize, int prefetchCount, boolean global)
// 使用
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新“推送”给消费者,直至消息消费成功为止。
//开启自带的重试机制,需要如下几个配置:
//1 开启消费者手动应答机制,对应的springboot配置项:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
//2 消费异常时,设置消息重新入列
boolean multiple = false; // 单条确认
boolean requeue = true; // 重新进入队列,谨慎设置!!!很容易导致死循环,cpu 100%
**延迟队列基于rabbitmq_delayed_message_exchange插件,实现延迟队列效果。它是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。**使用延迟队列,可以有效解决定时任务带来的系统压力以及业务处理时效性等问题。
优先级队列,也就是具有高优先级的队列,优先级高的消息具备优先被消费的特权。通过队列的 x-max-priority 参数设置队列的最大优先级,之后在发送消息时通过 priority 属性再设置当前消息的优先级。优先级应在 0 和 255 之间,推荐1 ~ 10。
惰性队列会尽可能地将消息存入磁盘中,而在消费者消费消息时才会被加载到内存中,它支持更多的消息存储。
队列具备两种模式:default 和 lazy。默认的为 default 模式,在队列声明的时候可以 通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阈值是50%时就会换页处理。也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。
当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
默认情况下:**磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。**这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。
RabbitMQ可以通过三种方法来部署分布式集群系统,分别是:cluster,federation,shovel
RabbitMQ集群会始终同步四种类型的内部元数据(类似索引):
RabbitMQ要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。
如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(包括创建队列、交换器、绑定,添加用户、更改权限、添加和删除集群结点),直到节点恢复。
解决方案:设置两个磁盘节点,至少有一个是可用的,可以保存元数据的更改。
一,存储空间,如果每个集群节点都拥有所有Queue的完全数据拷贝,那么每个节点的存储空间会非常大,集群的消息积压能力会非常弱(无法通过集群节点的扩容提高消息积压能力);
二,性能,消息的发布者需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加。
单节点的 RabbitMQ 存在性能上限,可以通过垂直或者水平扩容的方式增加 RabbitMQ 的吞吐量。垂直扩容指的是提高 CPU 和内存的规格;水平扩容指部署 RabbitMQ 集群。在 3.8 以前的版本,RabbitMQ 通过镜像队列(Classic Queue Mirroring)来提供高可用性。但镜像队列存在很大的局限性,在 3.8 之后的版本 RabbitMQ 推出了 Quorum queues 来替代镜像队列,在之后的版本中镜像队列将被移除。
**镜像队列通过将一个队列镜像(消息广播)到其他节点的方式来提升消息的高可用性。当主节点宕机,从节点会提升为主节点继续向外提供服务。**配置镜像队列规则后,新创建的队列按照规则成为镜像队列。每个镜像队列都包含一个主节点(Leader)和若干个从节点(Follower),其中只有主节点向外提供服务(生产消息和消费消息),从节点仅仅接收主节点发送的消息。从节点会准确地按照主节点执行命令的顺序执行动作,所以从节点的状态与主节点应是一致的。
ha-sync-mode | 说明 |
---|---|
manual | 这是默认模式。新队列镜像将不接收现有消息,它只接收新消息。一旦使用者耗尽了仅存在于主服务器上的消息,新的队列镜像将随着时间的推移成为主服务器的精确副本。如果主队列在所有未同步的消息耗尽之前失败,则这些消息将丢失。您可以手动完全同步队列,详情请参阅未同步的镜像部分。 |
automatic | 当新镜像加入时,队列将自动同步。值得重申的是,队列同步是一个阻塞操作。如果队列很小,或者您在RabbitMQ节点和ha-sync-batch-size之间有一个快速的网络,那么这是一个很好的选择。 |
**镜像队列主节点出现故障时,最老的从节点会被提升为新的主节点。**如果新提升为主节点的这个副本与原有的主节点并未完成数据的同步,那么就会出现数据的丢失,而实际应用中,出现数据丢失可能会导致出现严重后果。
rabbitmq 提供了 ha-promote-on-shutdown
,ha-promote-on-failure
两个参数让用户决策是保证队列的可用性,还是保证队列的一致性;两个参数分别控制正常关闭、异常故障情况下从节点是否提升为主节点,其可设置的值为 when-synced
和 always
。
ha-promote-on-shutdown/ha-promote-on-failure | 说明 |
---|---|
when-synced | 从节点与主节点完成数据同步,才会被提升为主节点 |
always | 无论什么情况下从节点都将被提升为主节点 |
**RabbitMQ中的每个队列都有一个主队列。该节点称为队列主服务器。所有队列操作首先经过主队列,然后复制到镜像。**这对于保证消息的FIFO排序是必要的。通过在策略中设置 queue-master-locator
键的方法可以定义主队列选择策略。
可选参数列表
queue-master-locator | 说明 |
---|---|
min-masters | 选择承载最小绑定主机数量的节点 |
client-local | 选择客户机声明队列连接到的节点 |
min-masters | 随机选择一个节点 |
需要考虑网络IO、磁盘IO、以及CPU相关
镜像队列同时支持生产者确认和事务机制。在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx.Commit-OK
的消息。同样的,在生产者确认机制中,生产者进行当前消息确认的前提是该消息被全部镜像接收。
RabbitMQ 使用信用证机制限制消息生产的速度。当生产者收到队列的所有镜像授予的信用时,才允许发送新的消息。如果有镜像没有授予生产者信用,会导致生产者生产阻塞。生产者会一直被阻塞,直到所有镜像都授予它信用值,或者有的镜像从集群中断开。
Erlang 通过定时向所有节点发送心跳的方式检测断开的情况。发送心跳的间隔可以用 net_ticktime 来控制。
生产者,消费者连接到 RabbitMQ 后,在 RabbitMQ 内部会创建对应的 Connection,Channel 进程。
Connecton 进程从 socket 上接收生产者发送的消息后投递到 Channel 进程。在 Channel 进程中,根据消息发送的 exchange 与消息的 routing-key,在内部数据库的路由表中,查找所有匹配的 Queue 的进程 PID,然后将消息投递到Queue 的进程中。
**在镜像队列的情况下,Channel 进程除了将消息发送给队列的 Leader 进程外,还会将消息发送给队列所有的 Follower 进程,而 Follower 进程都在远端节点上,因此这里就多了一次集群间的网络交互。 **
RabbitMQ 是采用 GM(组播)算法实现确认消息,镜像队列中的 Leader 和所有 Follower 都会发送一次消息和接收一次消息,同时还会发送一次对消息的 ACK,和接收一次消息的 ACK。
简单来说就是生产者发送一条消息,队列 Leader 进程所在节点会收到两次:一次是生产者发送的,一次是队列 Follower 进程发送的;同样也会将消息对外发送两次:一次是生产者对应的 Channel 进程将消息发送给队列的 Follower 进程;一次是队列的 Leader 进程进行广播同步将消息发送给 Follower 进程。
rabbitmqctl cluster_status
检查集群健康状态,不正常节点重新加入集群 RabbitMQ可以用于分布式系统之间通信。内部采用AMQP协议对流量可以进行很好的肖峰,交换器有多种类型,可以对不同业务场景的消息采用不同的交换器类型。RabbitMQ 提供了消息确认机制,支持消息重试,也支持事务型消息,如果txCommit()提交了,则该消息一定会持久化。但是会存在消息未成功消费,消息丢失情况。
消息的持久化必须是三个环节都进行持久化(投递模式、持久化交换机、持久化队列),而且持久化不是采用通过缓冲区刷盘到磁盘中,所以一旦启动持久化的消息,在性能方面非常差。不论是持久化的消息还是非持久化的消息都可以写入到磁盘中,只不过非持久的是等内存不足的情况下才会被写入到磁盘中。
另外RabbitMQ也支持高可用模式,有集群方案。集群有多种方式,集群的从节点可以为磁盘节点或者内存节点。集群之后服务节点之间的消息一致性,采用 GM(组播)算法实现确认消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。