赞
踩
运维人员创建交换器和队列的时机?
- 一个成熟的公司,在实际的生产环境中,交换器、队列和绑定关系的创建都是在开发人员的程序代码中封装好的,运维人员在搭建平台时,运行了相应的程序包就会自动创建业务中需要使用的交换器、队列等信息。这是看每个公司的产品经理对业务架构的设计是否成熟。
- 架构设计之初,需要跟资深运维预估业务线的实际盛产环境下的队列的流量、内存占用及网卡占用的平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
- 运维人员在平台搭建好后,只需要在web页面临时创建个测试交换器和队列,对接入进来的业务数据进行测试,测试程序是否正常流转即可,只有在这种情况下才会用到手动创建交换器、队列和绑定关系的,除非你们公司才刚刚起步,需要运维架构师参与架构设计。该篇文章只讲偏运维的知识,不设计开发向。
为什么架构设计之初需要预估队列的使用情况?
- 因为rabbitmq的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。所以如果要衡量rabbitmq当前的每秒查询率,只需看队列的即可。
提前声明交换器等信息的好处:
- 业务程序也可以免去声明的过程,直接使用即可,提高业务运转效率。
- 可以确保交换器和队列之间正确地绑定匹配,防止人为因素、代码缺陷导致消息丢失。
- 提高运维效率,方便管理服务器资源。
- 如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。
- 如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。
- Name:交换器名称。
- Type:交换器的类型,fanout、 direct、topic、headers共4种,每种类型作用不同,可以参考前面章节。
- Durability:设置是否持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
- durable:代表持久化。
- transient:代表非持久化。
- Auto delete:设置是否自动删除。
- Yes,表示自动删除。
- No,默认为no。
- 自动删除前提:是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或交换器都与此解绑。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器。
- internal:设置是否是内置的。
- Yes,表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
- No,默认为no。
- Arguments:添加一些其他结构化参数,比如:Add alternate-exchange(备份交换机)
- 备份交换器,英文名称为 Alternate Exchange,简称AE。
- 什么情况下使用备份交换器?
- 当开发人员写代码不会使用mandatory 参数,或者不想把代码写的很复杂,同时又要保证消息在未被路由的情况下不丢失,此种情况下就可以使用备份交换器。
- 备份交换器可以将未被路由的消息存储在rabbitmq中,再在需要的时候去处理这些消息。
- 当同时使用备份交换器和Policy策略时,则前者优先级更高,会覆盖掉 Policy 的设置。
- 现在有两个交换器,信息如下:
- 第一个交换器名称:qingjun_exchange,类型为direct,通过绑定键qingjun_key绑定到qingjun_queue队列。
- 第二个交换器名称:qingjun_bck_exchange,类型为fanout,通过绑定键qingjun_bck_key绑定到qingjun_bck_queue队列。
- 将qingjun_bck_exchange设置为qingjun_exchange的备份交换器。
原理流程:
- 发送第一条消息"wuhan"到qingjun_exchange交换器时,指定的路由键为qingjun_key,此时可以完全匹配到绑定键qingjun_key,消息“wuhan”进入qingjun_queue队列。
- 发送第二条消息“wuhan_1”到qingjun_exchangej交换器时,指定的路由键为qingjun1_key,此时不能完全匹配到绑定键qingjun_key,消息“wuhan_1”给到qingjun_bck_exchange备份交换器,通过绑定键qingjun_bck_key进入到qingjun_queue队列里,并显示信息是从qingjun_exchange交换器发过来的。
第一步,给qingjun_exchange发送消息"wuhan",指定路由键为qingjun_key。
第二步,查看qingjun_queue队列里收到qingjun_exchange发来的消息"wuhan"。
3. 第三步,给qingjun_exchange发送消息"wuhan_1",指定路由键为qingjun1_key,,或者不指定,这样就没法完全匹配到绑定键qingjun_key,消息转给qingjun_bck_exchange交换器。
- 备份交换器得慎用,开发人员使用时还需要了解mandatory参数的结合使用,比较复杂,总结了以下几种特殊情况:
- 如果设置的备份交换器不存在,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有任何匹配的队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。
- Type:队列类型。不同类型创建的队列可以选择的参数不同。
- Default for virtual host:虚拟主机立的所有队列。
- Classic:主队列。
- Quorum:仲裁队列。
- Stream:流队列。
- Name:队列名称。
- Durability:设置是否持久化。持久化可以将队列存盘,在服务器重启的时候不会丢失相关信息。
- durable:代表持久化。
- transient:代表非持久化。
- Node:设置在集群中的哪个节点上。
- Auto delete:设置是否自动删除。(主队列类型存在)
- Yes,则设置队列为自动删除。
- No,默认设置,不自动删除。
- 自动删除前提:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
Default for virtual host参数:
- Auto expire : 队列生存期,单位毫秒,队列多长时间没有被使用(访问)就会被删除。换句话说就是,当队列在指定的时间内没有被使用(访问)就会被删除。
- Message TTL:消息生存期,单位毫秒。可以用作延迟队列,消息延迟消费等场景。
- Overflow behaviour:设置队列溢出行为,队列中的消息溢出后如何处理。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head。
- Single active consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)。
- Dead letter exchange:死信队列交换机名称,过期或溢出被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中。
- Dead letter routing key:死信消息路由键,当消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值。
- Max length:队列最大长度,可以理解为队列可以容纳的消息的最大条数。超过该最大值,则将从队列头部开始删除消息。
- Max length bytes:队列消息内容占用对打空间,可以理解为队列可以容纳的消息的最大字节数,受限服务器内存大小,超过该阈值则从队列头部开始删除消息。
- Leader locator:设置在节点集群上声明队列前导时定位的规则。有效值为client-local(默认值)和balanced。
Classic参数:
- Auto expire : 队列生存期,单位毫秒,队列多长时间没有被使用(访问)就会被删除。换句话说就是,当队列在指定的时间内没有被使用(访问)就会被删除。
- Message TTL:消息生存期,单位毫秒。可以用作延迟队列,消息延迟消费等场景。
- Overflow behaviour:设置队列溢出行为,队列中的消息溢出后如何处理。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head。
- Single active consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)。
- Dead letter exchange:死信队列交换机名称,过期或溢出被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中。
- Dead letter routing key:死信消息路由键,当消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值。
- Max length:队列最大长度,可以理解为队列可以容纳的消息的最大条数。超过该最大值,则将从队列头部开始删除消息。
- Max length bytes:队列消息内容占用对打空间,可以理解为队列可以容纳的消息的最大字节数,受限服务器内存大小,超过该阈值则从队列头部开始删除消息。
- Maximum priority:最大优先级,数字越大,越优先消费,可以针对实时性强的消息优先消费。队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
- Lazy mode:将队列设置为延迟模式成为懒队列,在磁盘上保留尽可能多的消息,以减少RAM的使用。如果未设置,队列将保留内存缓存以尽可能快地传递消息。
- Version:设置队列版本。默认为版本1。版本1有一个嵌入小消息的基于日志的索引。版本2有一个不同的索引,在许多情况下提高了内存使用和性能,以及以前嵌入的消息的每个队列存储。
- Master locator : 在集群模式下设置镜像队列的主节点信息,将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则
Quorum参数:
- Auto expire : 队列生存期,单位毫秒,队列多长时间没有被使用(访问)就会被删除。换句话说就是,当队列在指定的时间内没有被使用(访问)就会被删除。
- Message TTL:消息生存期,单位毫秒。可以用作延迟队列,消息延迟消费等场景。
- Overflow behaviour:设置队列溢出行为,队列中的消息溢出后如何处理。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head。
- Single active consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)。
- Dead letter exchange:死信队列交换机名称,过期或溢出被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中。
- Dead letter routing key:死信消息路由键,当消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值。
- Max length:队列最大长度,可以理解为队列可以容纳的消息的最大条数。超过该最大值,则将从队列头部开始删除消息。
- Max length bytes:队列消息内容占用对打空间,可以理解为队列可以容纳的消息的最大字节数,受限服务器内存大小,超过该阈值则从队列头部开始删除消息。
- Delivery limit:允许不成功交付尝试的次数。一旦消息传递失败超过这个次数,它将被丢弃或变成死信,这取决于队列配置。
- Initial cluster size:设置队列初始集群大小。
- Dead letter strategy :可选项为at-most-once 和at-least-once。默认为“最多一次”。此设置仅由仲裁队列使用。如果设置了at-least-once,则溢出行为必须设置为拒绝发布。否则,死信策略将退回至多一次。
- Leader locator:设置在节点集群上声明队列前导时定位的规则。有效值为client-local(默认值)和balanced。
Stream参数:
- Max length bytes:队列消息内容占用对打空间,可以理解为队列可以容纳的消息的最大字节数,受限服务器内存大小,超过该阈值则从队列头部开始删除消息。
- Max time retention:设置流队列的数据保留时间。可选参数有:
- Y:年
- M:月
- D:日
- h:小时
- m=分钟
- s=秒
- 比如,"1h"配置流只保留最近1小时收到的消息。
- Initial cluster size:设置队列初始集群大小。
- Leader locator:设置在节点集群上声明队列前导时定位的规则。有效值为client-local(默认值)和balanced。
- 在交换器页面绑定需要提前知道要绑定的那个队列,换句话说绑定队列需要提前存在,如果这个队列不存在会显示绑定失败。
- 所以我们一般都是在队列页面绑定。
2. 创建baimu_queue队列,再与qingjun_exchange交换器绑定成功。
3. 也可以在队列界面查看绑定信息。
- 运维人员都是在队列页面绑定,因为在队列页面可以一眼看出有哪些队列,点进我们要绑定的队列,直接与已存在的交换器绑定就行。
- 交换器与交换器也能绑定,A交换器与B交换器绑定后,消息会到B交换器绑定的队列中。
- B交换器的类型有限制,不然此种路由消息会失败。
2. baimu_exchange交换器与baimu_queue队列绑定,绑定键为qingjun——baimu_key。
为什么会有消息确认机制?
- 为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制。消费者在订阅队列时,可以指定 autoAck 参数,这个参数属于开发向的,运维可以了解下。
- 也同时存在消息拒绝机制,这个完全就是开发向的,我这里不讲,感兴趣的同学可以自行bilibili。
消息确认机制怎么用?
- 当 autoAck=false时,rabbitmq会等待消费者显式地回复确认信号后才从内存或者磁盘中移去消息,实质上是先打上删除标记,之后再删除。这样一来,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。
- 当 autoAck=true时,rabbitmq会自动把发送出去的消息置为确认,然后从内存或者磁盘中删除,而不管消费者是否真正地消费到了这些消息。
队列中的消息状态:
- 当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:
- 一部分是等待投递给消费者的消息,俗称“消息挤压”,在web监控页面显示的是Ready部分的消息数。
- 另一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息,在web监控页面显示的是Unacked部分的消息数。
未确认消息怎么处理?为什么?
- 如果rabbitmq一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则rabbitmq会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
- rabbitmq不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是rabbitmq允许消费者消费一条消息的时间可以很久很久。
- Ready:显示等待投递给消费者的消息。
- Unacked:显示已经投递给消费者,但是还没有收到消费者确认信号的消息。
- 可以通过查看队列状态来看消息状态,其中带有messages相关参数就是查看消息状态的。
[root@node2 ~]# rabbitmqctl list_queues --help
[root@node2 ~]# rabbitmqctl list_queues name messages_ready
[root@node2 ~]# rabbitmqctl list_queues name messages_ready messages_unacknowledged
参数 | 释义 |
---|---|
name | 队列名称 |
durable | 表示服务器重启之后,队列是否存活 |
auto_delete | 表示不再使用的队列是否自动被删除 |
arguments | 显示队列参数 |
policy | 显示应用在队列中的策略名称 |
pid | 表示和队列相关联的Erlang进程的ID |
owner_pid | 表示作为队列的排他所有者的连接的Erlang进程的ID,如果队列是非排他,则为空。 |
exclusive | 表示队列是否是排他的,有 owner_pid 返回 True,否则返回 False |
exclusive_consumer_pid | 表示排他消费者订阅该队列的频道的Erlang进程的ID,如果没有独家消费者,则为空。 |
exclusive_consumer_tag | 表示订阅该队列的排他消费者的消费tag。如果没有排他消费者,则为空。 |
messages_ready | 显示准备被发送到客户端的消息数量 |
messages_unacknowledged | 显示已经被发送到客户端但是还没有被确认的消息数量 |
messages | 表示准备发送和没有被确认的消息数量总和(队列深度) |
messages_ready_ram | 表示驻留在内存里的 messages_ready 的消息数量 |
messages_unacknowledged_ram | 表示驻留在内存里的 messages_unacknowledged 的消息数量 |
messages_ram | 表示驻留在内存里的消息总数 |
messages_persistent | 表示队列中持久消息的总数(对于临时队列,总是为0) |
message_bytes | 表示在队列中所有消息body的大小,这并不包括消息属性(包括header)或者任何开销 |
message_bytes_ready | 表示类似于 messge_bytes 但仅仅计算那些将发送到客户端的消息 |
message_bytes_unacknowledged | 表示类似于 message_bytes 但仅仅计算那些已经发送到客户还没有确认的消息 |
message_bytes_ram | 表示类似于 message_bytes 但仅仅计算那些驻留在内存中的消息 |
message_bytes_persistent | 表示类似于 message_bytes 但仅仅计算那些持久消息 |
head_message_timestamp | 表示队列中第一个消息的时间戳属性(如果存在)。只有处在 paged-in 状态的消息才存在时间戳。 |
disk_reads | 表示该队列自start起,从磁盘读取消息的次数总和 |
disk_writes | 表示该队列自start起,被写入磁盘消息的次数总和 |
consumers | 表示consumer的数量 |
consumer_utilisation | 表示队列能够立即将消息传递给消费者的时间分数(0.0 ~ 1.0之间),如果消费者受到网络拥塞或者预取计数的限制,该值可能小于1.0 |
memory | 表示和该队列相关联的Erlang进程消耗的内存字节数,包括stack/heap/内部数据结构 |
slave_pids | 表示该队列目前的slave的ID号(如果该队列被镜像的话) |
synchronised_slave_pids | 表示如果队列被镜像,给出与主队列同步的当前slave的ID号,即可以从主队列接管而不丢失消息的slave的ID |
state | 表示队列的状态,一般是 “running”; 如果队列正在同步,也可能是 “{syncing, MsgCount}”; 如果队列所处的节点当前down了,队列显示的状态为 “down” |
- 既然存在消费者消息确认机制,以确保消息处于什么状态从而进行有效的数据监控管理,但是这一步时消息已经到达rabbitmq服务的。那生产者把消息发送给rabbitmq这段时间内也同样需要监控确认消息状态,消息是否真正地发给rabbitmq里了。
- 若不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是说生产者是不知道消息有没有正确地到达服务器。rabbitmq针对这个问题,提供了两种解决方式:
- 通过事务机制实现。
- 通过发送方确认机制实现。
- 使用事务机制会降低rabbitmq的消息吞吐量,所以rabbitmq就提供了一个改进方案,发送方确认机制这才出现。
- rabbitmq客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit、channel.txRollback。
- channel.txSelect 用于将当前的信道设置成事务模式。
- channel.txCommit 用于提交事务。
- channel.txRollback 用于事务回滚。
- 在通过channel.txSelect 方法开启事务之后,当消息发送给rabbitmq时:
- 若事务提交成功,说明消息一定到达了rabbitmq中;
- 若事务提交执行之前由于rabbitmq异常崩溃或者其他原因抛出异常,此时就可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。
- 开启事务机制与不开启相比多了四个步骤。
- 第一步,客户端发送 Tx.Select,将信道置为事务模式。
- 第二步,rabbitmq broker回复 Tx.Select-ok,确认已将信道置为事务模式。
- 第三步,在发送完消息之后,客户端发送 Tx.Commit 提交事务。
- 第四步,rabbitmq broker回复 Tx.Commit-ok,确认事务提交。
- 第一步,客户端发送 Tx.Select,将信道置为事务模式。
- 第二步,rabbitmq broker回复 Tx.Select-ok,确认已将信道置为事务模式。
- 第三步,在发送完消息之后,客户端正准备发送 Tx.Commit 提交事务时,捕获到异常,客户端发送Tx.Rollback给rabbitmq broker开启事务回滚。
- 第四步,rabbitmq broker回复 Tx.Rollback-ok,确认事务回滚已开启。
- 第一步:生产者将信道设置成 confirm(确认) 模式。
- 第二步:在该信道上发布消息,每条消息都会被指派一个唯一的 ID(从1开始)。
- 第三步:当消息被投递到所有匹配的队列之后,rabbitmq会发送一个确认消息Basic.Ack给生产者, 确认消息中包含消息的唯一 ID。
- 若消息和队列是可持久化的,那么确认消息Basic.Ack会在消息落盘之后发出。
- 确认消息中有两个参数信息:
- deliveryTag参数, 包含了确认消息的序号,表示到这个序号之前的所有消息是否都已经处理。
- multiple参数,有true、false两个选项,表示消息是否得到了处理,和deliveryTag参数配合使用。
- 第四步:收到确认消息Basic.Ack,就知晓了消息已经正确到达了目的地。
- 如下图,有两个生产者,其所在地信道都已设置成confirm模式。
- 第一步,生产者发送信息。
- 生产者_1发送了3条消息,每条消息带有一个唯一ID,从1开始,所以就是1、2、3。
- 生产者_2发送了1条消息,每条消息带有一个唯一ID,从1开始,所以就是1。
- 第二步,4条消息都进入rabbimq broker,并发送确认消息给生产者_1和生产者_2。
- 给生产者_1发送的确认消息为,前3条消息都已被处理。
- 给生产者_2发送的确认消息为,前1条未被处理。
- 这两种方式是改良使用方式,是因为publisher confirm 的优势在于并不一定需要同步确认。
- 批量 confirm方法:每发送一批消息后,调用 channel.waitEorConfirms 方法,等待服务器的确认返回。
- 异步 confirm方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会回
- 事务机制和 publisher confirm 机制两者是互斥的,不能共存。
- 若将已开启事务模式的信道,再设置为 publisher confirm 模式,rabbitmq提示报错:{amgp error,preconditionfailed,“cannot switch from tx to confirm mode”,‘confirm.select’)。
- 多将已开启 publisher confirm 模式的信道,再设置为事务模式,rabbitmq提示报错:"cannot switch from confirm to tx{amqp error, precondition failed,‘tx.select’ }。mo de "
- 事务机制和 publisher confirm 机制确保的是消息能够正确地发送至rabbitmq交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。
- 事务机制存在阻塞性,发送方确认机制不存在。
- 事务机制发送一条消息后,会使生产者阻塞,直到等待rabbitmq回应之后才能继续发送下一条消息。
- 发送方确认机制发送一条消息,生产者应用程序在等信道返回确认的同时可以继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息。
- 事务机制和普通confirm的方式占用服务器资源,导致吐吞量很低,编程方式简单。而批量confirm和异步confirm两种方式对服务器性能影响低,但是写代码会变得更为复杂。
- 事务机制和普通confirm的方式,不需要在客户端维护状态 (这里指的是维护 deliveryTag 及缓存未确认的消息)。
- 批量confirm方式弊端在于,遇到rabbitmq服务端返回 Basic.Nack 需要重发批量消息而导致的性能降低。
- 异步 confirm方式不断在于,编程模型最复杂,而且和批量 confirm 方式一样需要在客户端维护状态。(建议使用)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。