赞
踩
Producer:生产者,投递消息的一方
Consumer:消费者,接收消息的一方
Broker:消息中间件的服务节点,对于RabbitMq而言,一个Broker可以看作一个RabbitMq服务节点
Queue:队列,RabbitMq的内部对象,用于存储消息。这一点与Kafka相反,Kafka将消息存储在topic这个逻辑层面,而对应的队列逻辑只是topic实际存储文件中的位移标识。
生产者生产消息最终投递到队列中,多个消费者可以订阅同一个队列,队列中的消息会被平均分摊到多个消费者进行处理,而非每个消息者都收到所有消息。
Exchange:交换器,生产者的所有消息都是发送到交换器,由交换器路由到一个或多个队列中。
RoutingKey:路由键,生产者将消息发送给交换机的时候一般会指定一个RoutingKey,用于指定这个消息的路由规则。
BindingKey:绑定key,RabbitMq通过绑定将交换机与队列关联起来,绑定的时候一般会指定一个绑定key,结合路由键使路由规则生效。
交换器有四种类型:fanout:把消息直接发送到所有与该交换器绑定的队列中
direct:把消息路由到BindingKey和RoutingKey完全匹配的队列
topic:BindingKey和RoutingKey模糊匹配。BindingKey中“*”匹配一个单词,“#”匹配多个单词
header:根据headers属性进行匹配
Connection:生产者或消费者与RabbitMqBroker建立的tcp连接
Channel:一旦tcp连接建立起来,客户端就可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的id,信道是建立在Connection之上的虚拟连接,RabbitMq处理每条AMQP指令都是通过信道完成的。
RabbitMq运转流程:
实际开发中,一般会先在RabbitMq Management创建好交换器和队列并绑定好。
消费者接收消息的过程类似
通过以上总结我们大概了解了RabbitMq的整体概念,实际中还有很多细节需要注意。
比如,当交换器无法找到符合条件的队列时,会怎么处理?这就涉及channel.basicPublish方法中的mandatory和immediate两个参数。
mandatory参数:当设置mandatory参数为true时,如果交换机无法找到符合要求的队列,RabbitMq broker会调用Basic.Return命令将消息返回给生产者,设置为false时,则消息直接被丢弃。
生产者可通过调用channel.addReturnListener来添加ReturnListener监听器获取相关消息。
immediate参数:当immediate参数设置为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息不会存入队列中,如果所有匹配的队列都没有消费者,该消息会通过Basic.Return返回到生产者。
(该参数在3.0版本后已经不再支持,建议采用TTL和DLX的方法替代)
如果不设置mandatory参数,那么消息在未被路由的情况下将会产生丢失,设置了mandatory参数,生产者又需要添加ReturnListener的编辑逻辑,较为麻烦,可以采用备份交换器的方式进行处理。
在声明交换器的时候添加alternate-exchange参数来实现,如果交换器设置了备份交换器,在找不到匹配的队列时候,消息会被发送到备份交换器并路由到对应队列。备份交换器的类型一般为fanout类型。
=========================================================================
前面提到了采用TTL和DLX的方法替代immediate参数,那么TTL和DLX是什么?
TTL(Time-To_Live)即 一条消息在队列中的最大存活时间,RabbitMq可对消息和队列设置TTL。在一条在队列中超过配置的 TTL 的消息称为已死消息。但是需要注意的是,已死消息并不能保证会立即从队列中删除,但是能保证已死的消息不会被投递出去。
设置TTL有两种方式:给队列设置 x-message-ttl,所有被投递到该队列的消息都会在到达TTL时成为已死消息。
针对每条消息设置TTL,此时需要注意的时,当消息达到 TTL 时,可能不会马上被丢弃,因为只有处于队列头部消息过期后才会被丢弃,假如队列头部的消息没有设置TTL,而第 2 条消息设置了 TTL,那么即使第2条消息成为了已死消息,也必须要等到队列头部的消息被消费之后才会被丢弃,而已死消息在被丢弃之前也会被计入统计数据(比如队列中的消息总数)。因为每条消息是否过期是在即将投递到消费者之前判的。
DLX(Dead-Letter-Exchange)死信交换器,当消息在一个队列中变成死信之后,它能被重新发送到另外一个交换器,这个交换器就是DLX,绑定DLX的队列就是死信队列。
通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数为队列添加DLX
消息变成死信的情况:
利用DLX配合将消息的TTL设置为0可以弥补immediate参数的功能。
延迟队列:主要解决消息被发送后,需要等待特定时间后才被消费者消息的场景。正是通过DLX和TTL实现延迟队列的功能。这个也是实现闹钟服务功能的一种方式。
但是这种方式存在一定的缺陷,由于我们的业务场景每次创建闹钟的响铃时延都不一样,因此需要对每条消息而非队列设置TTL ,使用这种方式设置的 TTL,消息可能不会按时死亡,在上面有提到存在这样的场景,第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。要解决这个问题需要安装特殊的插件。
优先级队列:通过设置队列的x-max-priority参数设置队列的最大优先级,发送消息时设置当前消息的优先级,优先级高的消息具备优先被消费的特权。(如果消费者的消费速度大于生产者的速度,且Broker中没有消息堆积情况,对发送的消息设置优先级就没有实际意义了)
=========================================================================
以上介绍了TTL、DLX、延迟队列等概念,接下来考虑一下当RabbitMq出现异常(重启、关闭、宕机)情况,如果不做任何处理势必导致数据丢失,因此“持久化”也是重要的功能。
RabbitMq的持久化包括:交换器持久化、队列的持久化、消息持久化
交换器持久化:在声明交换器时将durable参数设置为true,如果交换器不设置持久化,在服务重启后,交换器的元数据会丢失,对于长期使用的交换器最好设置为持久化
队列持久化:在声明队列的时候将durable参数设置为true,如果队列不设置持久化,服务重启后队列的元数据会丢失,消息也会丢失
消息持久化:设置队列的持久化能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失,要确保消息不丢失,需要设置消息持久化,通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
通过持久化的设置可以解决因服务器的异常崩溃而导致的消息丢失,这样是否就能保证消息不丢失?答案是否定的,我们接下来通过消息发送的流程来分析一下。
1、首先是消息从生产者发送到Broker,如果消息在达到Broker之前已经丢失,那么持久化也解决不了问题,我们需要确认消息是否正确到达了服务器。
RabbitMq提供了两种方式解决这个问题:事务机制和发送方确认方式 ,这两种方式是互斥的
事务机制:发送消息前开启事务,发送后提交事务,如果在事务提交执行之前抛出异常,可以将其捕获,执行channel.txRollback来实现事务回滚。开启事务之后,有一个致命的缺点就是发送消息流程会被阻塞。也就是说必须一条消息发送成功之后,才会允许发送另一条消息。
rabbitTemplate.setChannelTransacted(true);//开启事务
发送方确认方式
由于事务机制的是同步等待的方式,因此不建议在生产环境开启事务机制。另外一种方式是发送方确认机制,生产者将信道设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,消息投递到匹配的队列后,RabbitMq会发送一个确认(Basic.Ack)给生产者(包含消息的唯一Id)。相对于事务机制,发送方确认机制最大的好处在于它是异步的,如果RabbitMq因为自身内部错误导致消息丢失,就会发送一条nack命令。
2、如果消息无法从交换器路由到正确队列,需要证明处理,这就可以使用前面介绍的设置mandatory参数,添加ReturnListener进行处理,也可以使用备份交换器的方式进行处理。
3、当消息安全达到队列后,就需要持久化的功能防止消息的丢失
4、当消息终于到达消费者,出现其它意外导致消费失败,那么还是可能导致数据的丢失。此时需要消费者确认(ack)的功能保证消息被成功消费。消费者默认在收到消息后会给服务器一个应答,服务端收到消费者的应答之后,就会删除消息。开启消费者确认后,需要消费者手动进行应答确认,服务器才会删除消息。
=========================================================================
不管是持久化的消息还是非持久化的消息都可以被写入磁盘,持久化的消息在到达队列时就被写入磁盘,在内存不吃紧的情况下,持久化消息也会在内存中保存一份备份。而非持久化的消息一般保存在内存中,在内存吃紧的时候也会被换入磁盘中。这两种类型的消息的落盘处理都在RabbitMq的
“持久层”中完成。
持久层是一个逻辑上的概念,实际包括队列索引和消息存储两个部分。功能如下图
ETS:记录消息在文件中的位置映射和文件的相关信息,删除消息只是先从ETS表删除指定消息的相关信息,同时更新存储文件的相关信息,即先标记为垃圾数据,后续再进行删除或者合并
消息实际存储的地方在队列,有必要了解一下队列的结构。
通常队列由rabbit_amqqueue_process和backing_queue这两部分组成,
rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(生产端的confirm和消费端的ack)等
backing_queue:是消息存储的具体形式和引擎
如果消息投递的目的队列非空且有消费者,那么消息会直接发送到消费者,不会经过队列这一步。而消息无法直接投递给消费者时,会将消息存入队列。消息存入队列后,消息的状态会不断改变,可能存在以下4种状态:
Alpha: 消息内容(消息体、属性和headers)和消息索引都存在内存中
Beta: 消息内容保存在磁盘中,消息索引在内存
Gamma:消息内容保存在磁盘中,消息索引在磁盘和内存都有
Delta: 消息内容和索引都在磁盘
对于普通的没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来体现消息的各个状态。
Q1、Q4只包含alpha状态
Q2、Q3包含beta,gamma状态
消息根据负载的情况进行流动,在负载高的情况下存储到磁盘,在负载低的情况下存储到内存,使队列具有很好的弹性。具体的流动过程较为复杂,有兴趣可以查看《RabbitMQ实战》一书。
=========================================================================
RabbitMq从3.6.0版本引入了惰性队列的概念,惰性队列会尽可能地将消息存入磁盘中,而消费者消费到相应消息时才会被加载到内存中,当消费者由于各种原因致使长时间内不能消费消息而造成堆积时,惰性队列就很有必要了。
在声明队列的时候通过x-queue-mode参数可以设置队列的模式,取值为default和lazy,lazy模式则为惰性队列。
=========================================================================
镜像队列
引入镜像队列的机制,可以将队列镜像到集群中的其它Broker节点之上,如果集群中其中一个节点失效了,队列能够自动地切换到镜像的另一个节点上以保证服务的可用性。镜像队列的主节点(mater)和从节点(slave)结构如图。
如果master由于某种原因失效,那么“资历最老”的slave会被提升为新的master。根据slave加入的时间排序,时间最长的slave即为“资历最老”。
如果消费者与slave建立连接并进行订阅消费,实际上都是从master获取资源,master在将结果返回给slave,最后由slave投递给消费者。所有动作都只会向mater发送,再由master将命令执行的结果广播给各个slave。
由于这种特性,要实现负载均衡,需要每个队列的mater根据实际消息的数量均匀分布在不同的Broker上,如果每个队列都建立在同个Broker,那么该节点的压力会很大,无法达到负载均衡的效果。
注意要点:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。