赞
踩
目录
RabbitMQ 保证可靠性的一条是消息的持久化,那消息时如何持久化的呢?
不管是持久化的消息还是非持久化的消息都可以被写入磁盘。
持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一个备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中消除。
非持久化的消息一般只保存在内存中,在内存吃紧的时候再被换入到磁盘中,以节省内存空间。这两种类型的消息落盘处理都在 RabbitMQ 的持久层完成的。
持久层是一个逻辑概念,实际包括两部分:
rabbitI_queue_index 负责维护队列中落盘的消息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者 ack 等。每个队列中都有一个与之对应的 rabbit_queue_index。
rabbit_msg_store 以键值对的形式存储信息,它被所有队列共享,在每个节点中有且只有一个。在细分的话分为 msg_store_persistent 和 msg_store_transient,msg_store_persistent 负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient 负责非持久化消息的持久化,重启后消息会丢失。
最佳的配备是较小的消息存储在 rabbit_queue_index 中,而较大的消息存储在rabbit_msg_store 中。这个消息的大小界定可以通过 queue_index_embed_msgs_below 来设置,默认大小是4096B。这里的消息大小指消息体、属性及headers整体的大小,小于阈值可以存储在rabbit_queue_index中,这样可以得到性能的优化。
rabbit_queue_index 中以顺序的段文件来进行存储,后缀为.idx,每个段文件中包含固定的SEGMENT_ENTRY_COUNT 条记录,默认值是16384。
通常队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。
消息存入队列后,不是固定不变的,他会随着系统的负载在队列中不断的流动,消息的状态会不断发生变化。RabbitMQ 中队列的消息有四种状态。
对于持久化的消息,消息内容和消息索引都必须保存在磁盘上才会处于上述状态中的一种。而 gamma 状态的消息是只有持久化的消息才会有的状态。
RabbitMQ 在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量。如果 alpha 状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息会转换成其他状态。
区分这四种状态的主要原因是满足不同的内存和 CPU 需求。alpha 状态最耗内存,但很少消耗 CPU。delta 状态基本不消耗内存,但是需要消耗更多的 CPU 和磁盘 I/O 操作。delta 需要执行两次 I/O 才能读取到消息,一次是读取消息索引,一次是读取消息内容。
RabbitMQ 从3.6开始引入了惰性队列的概念。惰性队列会尽可能将消息存入磁盘中,而在消费者消费到对应的消息时才会被加载到内存中,他的一个重要设计目标是能够支持更长的队列,即支持更多的消息存储。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更快的将消息发送给消费者。
惰性队列的设置
- Map<String, Object> args = new HashMap<>();
- args.put("x-queue-mode", "lazy");
- Queue queue = new Queue(vodQueue, true, false, false, args);
RabbitMQ 可以对内存和磁盘使用设置阈值,当达到阈值时,生产者将被阻塞(block),直到恢复正常。
除了这两个阈值,RabbitMQ 从2.8.0版本开始还引入了流控( Flow Control)机制来确保稳定性。流控的机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。内存和磁盘告警相当于全局的流控(Global Flow Control),一旦触发会阻塞集群中所有的 Connection,而本节的流控是针对单个 Connection 的。
Erlange 进程之间并不共享内存(binary类型除外),而是通过消息传递来通信,每个进程都有一个自己的进程邮箱。默认情况下,Erlange 并没有对进程邮箱的大小进行限制,所以当大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并奔溃。在 RabbitMQ 中,如果生产者持续高速发送,而消费者的消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到阈值。
RabbitMQ 使用了一种基于信用证算法(credit-base algorithm)的流控机制来限制发送消息的速率以解决前面提到的问题。他通过监控各个进程的进程邮箱,当某个进程的负载过高而来不及处理消息时,这个进程的进程邮箱开始堆积消息。当堆积到一定量时,就会阻塞而不接收上游的新消息。从而慢慢的上游进程的进程邮箱也开始堆积。当堆积到一定数量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程 阻塞而暂停接受新的数据。
进程 A 接收消息并转发至进程 B,进程 B 接收消息并转发给进程 C。每个进程中都有一对关于收发消息的 credit 值。以进程 B 为例,{{credit_from, C}, value} 表示能发送多少条消息给 C,每发一条消息该值减 1,当为 0 时,进程 B 不再往进程 C 发送消息,也不再接收进程 A 的消息。
{{credit_to, A}, value} 表示在接收多少条消息就向进程A发送增加 credit 值的通知,进程 A接收到该通知后就增加{{credit_from, B}, value} 所对应的值,这样进程 A 就能持续的发送消息。当上游发送速率高于下游接收速率时,credit 值就会逐渐耗光,这时进程就会阻塞,阻塞的情况会一直传递到最上游。
当上游进程接收到增加 credit 值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始接收更上游的消息,一个个传导最终能够解除最上游阻塞状态。由此可知基于信用证算法的流控机制最终将消息的发送频率限制在消息处理进程的处理能力范围内。
一个链接(connection)触发流控时会处于“flow”状态,也就意味着这个 connection 的状态每秒在 blocked 和 unblocked 之间来回切换数次,这样可以将消息发送的速率控制在服务器能够支撑的范围内。
流控机制不仅仅作用于 connection,同样作用于信道和队列。从 connection 到 channel,再到队列,最后是消息持久化存储形成一个完整的流控链,对于整个流控链中的任意进程,只要该进程阻塞,上游的进程必然全部阻塞。也就是说,如果某个进程达到性能瓶颈,必然会导致上游所有的进程被阻塞。
综上所述,RabbitMQ 的流控机制既包括了客户端层面的信用控制,也包括了服务器层级的资源限制控制,共同确保了消息传输的稳定性和系统的可持续运行。
往期经典推荐
探秘 RabbitMQ 的设计理念与核心技术要点-CSDN博客
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。