赞
踩
基于前两次的分享会,结合rabbitmq相关知识,做一个小结。说明一致性的设计思想,在此说明相关的基础理论。
CAP定理:
在计算机科学里,CAP定理又被称作布鲁尔定理(Brewer theorem)。他认为对于一个分布式计算机系统来说,不可能同时满足以下三点。
可用性计算公式:A = Uptime/(Uptime+ Downtime)。Uptime是可用时间,Downtime是不可用时间
3.分区容错性(Partition tolerance)
在大型分布式系统实践中,分布式意味着必须满足分区容错性,也是P。为了追求更高的可用性,在一致性上会做一定的妥协,通常会选择AP。CAP定理如下图1所示:
图片1
BASE理论
基于对大规模分布式系统的实践总结,eBay的架构师Dan Pritchett在ACM(国际计算机学会)上发表文章提出了BASE理论,BASE理论是对CAP定理的延伸。
BASE理论的核心思想是:如果无法做到强一致性,或者做到强一致性要付出很大的代价,那么应用可以根据自身业务特点,采用适当的方式来使系统达到最终一致性,只要对最终的用户没有影响,或者影响是可接受的即可。
Quorum机制(NWR 模型)
在分布式场景中:如果多个服务分别向三个节点写数据,为保持强一致,就必须要求三个节点全部成功才返回。这样在读的时候可以读任意节点,就不会有不一致的情况了。但是,同步写三个节点的性能较低,如果换一个思路,一致性并不一定要在写数据的时候完成,可以在读的阶段决策,只要每次读到最新的版本就可以了。这就是Quorum机制的核心思想。
简单来说,Quorum机制就是要满足公式:W+R>N,式中N代表备份个数,W代表要写入至少W份才认为成功,R表示至少读取R个备份。这个公式把选择权交给了业务用户,让用户来做出最终决策。
NWR原理,如下图2所示,两个进程同时往3个节点写数据,v1表示版本1,v2表示版本2,如何保证每次读取都至少读到一个最新的版本呢?
图片2
假设N=3, W=1, R=1,W+R<N,在节点1写入,节点2读取,无法读取到最新的数据
假设N=3, W=1,R=2,W+R=N,在节点1写入,节点2、3读取,无法读到最新的数据
假设N=3,W=2,R=2,W+R>N,写入任意两个节点,读取任意两个节点,一定会读取到最新版本的数据。
假设N=3,W=3,R=1,W+R>N,同时写入所有节点,则读取任意节点就可以得到最新的数据。
当R=1且W=N时,适合读多写少的场景,读操作是最优的。当W=1且R=N,适合写多读少的场景,可以得到非常快的写操作。
但是还存在一个写入的冲突问题:假设N=3,W=1,一共有三个节点,只要写入一个就认为成功。如果第一次写入A节点,对变量a进行减1操作,变量a在A节点上由10变成了9,记录版本为v2,变量a还没来得及同步到另外两个节点上,a在B、C节点上的值还是10;第二次写入到B节点,同样对B节点进行减1操作,变量a在B上的值变为9,版本为v3,根据W+R>N规则,要读取三个节点,同时得到了v2,v3版本的数据,这时候就需要合并数据,处理冲突。由于v3版本更新,就回覆盖v2版本,结果a=9,但是实际上执行了两次减1操作。
如何要解决这个问题,,就要同时满足公式W>N/2。这样能保证每次写入都能和上次写入有交集,变成一个乐观锁,只有超过半数写成功才算成功。
Aurora是AWS的分布式关系型数据库,它的存储层就是基于Quorum协议的,除满足W+R>N外,还有一个要求是W>N/2。Aurora会部署在三个AZ(AvailablityZone)上,每个AZ包含了2个副本,总共6个副本,至少4个实例写成功才算成功,至少读三份数据。这样即使任意一个AZ不可使用,还有4个副本,不会丢失数据,不影响读写。此外,当任意一个AZ不可用的同时,另外两个AZ中的一个副本节点也不可用了,这样只会影响写,不会影响读。由于在同一个机房通过备份恢复一个副本节点会很快,Aurora采用万兆网卡,可以在10s恢复一个10GB副本。
租约机制(Lease)
一种是采用投票机制(Paxos算法);另一种则是采用租约机制-Lease(参考etcd)
状态机(Replicated State Machine)
在分布式环境下,如果要保证更高的可用性,更好的容错性,就必须建立多个副本。一个简单的办法是执行相同的命令,也就是说在每个数据中心都有相同的初始状态,执行相同命令,并且命令的顺序保持一致,这样就可以保证最终数据也相同了。
Replicated State Machine的一个典型应用案例就是MySQL同步,因为MySQL本身只支持Master-Slave结构。假设我们想同时写入三个节点,我们就可以在写入MySQL之前先通过Proxy写log,三个节点的Proxy之间通过Raft选取Leader,Leader接受请求,然后通过log实现各个replica(副本)的同步。这样可以保证读取到最新的数据
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
Erlang分布式并发编程:
Erlang 并发编程是在同一 Erlang 虚拟机上创建多个进程来实现的。而Erlang 的分布式编程可以通过在不同主机上的Erlang 虚拟机(可以运行在同一主机中或可以互相通过网络主机上)实现,其中运行Erlang 虚拟机的主机也称作Erlang 的节点。在Erlang 中,可以实现在一个Erlang 虚拟机上远程在另一个Erlang 虚拟机上创建新的工作的进程,然后利用其消息传递机制,将要计算的数据发送到另一虚拟机上的工作进程,在另一虚拟机中计算后,再使用消息传递机制将计算结果发送回来;也可以实现在一个Erlang虚拟机上以远程调用的方式在远程虚拟机上进行数据处理,并直接返回处理结果。
Erlang语言中与分布式编程有关的几个重要函数语法如下:
1.{pname,nodename}!Message
实现不同Erlang节点间的消息发送,其中pname是远程节点进程的注册名,nodename是远程节点名。
2.spawn(nodename,modulename,fun,[arg1,arg2..])
实现在远程节点上启动一个进程,与spawn/3相比只是多了个节点名……
3.rpc:call(nodename,modulename,fun,[arg1,arg2..])
实现远程调用的函数,它是Erlang标准库中的模块rpc中的函数,其参数与2中相同。
4.node()
返回本地节点名称
Erlang节点的启动方法:
1.在同一主机上启动多个Erlang节点命令行格式如下:
erl -sname nodename
使用短名称的形式启动一个名称为nodename的节点,其节点名称为启动后Erlang命令提示符所示,比如Erlang命令提示符为:
(ndb@USER-20170714FN)2>
则其节点名称为:ndb@USER-20170714FN。
2.在可以互相通信的主机上分别启动Erlang节点命令行格式如下:
erl -name nodename -setcookie abc
与1大致相同,区别是使用参数设置了节点的cookie,如果要实现节点间的通信,就需要多个节点的cookie的值完全相同。而1中在同一主机中,会自动设置为相同的cookie,所以没有进行显式的设置。
以下举一简单实例:
-module(nodetest).
-compile(export_all).
remotes() ->
receive
{From,Data} ->
From!"Remote recved:" ++ Data,
io:format("~s~n",[Data])
end,
remotes().
recv() ->
receive
Data -> io:format("~s~n",[Data])
end,
recv().
其中定义了一个remotes/0函数,可以将收到的消息输出后发回对方;recv/0函数只是简单的接收消息并输出。
从生产—消费—以及服务端这三个方面来简单说说 Rabbitmq,如下图2-1-1所示:
图2-1-1所示
一条消息经由生产者producer发出,然后转交到交换器exchange,然后又exchange路由到队列中来做存储。消费者连接队列直接从队列中读取消息。我们这举一个例子: 寄件人producer投递一个包裹到邮局exchange。有邮局负责帮忙投递到目的地queue中,之后再由收件人consumer拆件,这一套组织逻辑其实就是AMAQP协议的一套逻辑,注意RabbitMq中的exchange并不是一个处理模块的进程,而仅仅类似一张路由表而已。
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。 Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。 RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。 RabbitMQ为routing key设定的长度限制为255 bytes。
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。 binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。 该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
服务器端收到消息并处理
服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。
因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。下面先来看下RabbitMQ集群的整体方案:
上面图中采用三个节点组成了一个RabbitMQ的集群,Exchange A的元数据信息在所有节点上是一致的,而Queue(存放消息的队列)的完整数据则只会存在于它所创建的那个节点上。,其他节点只知道这个queue的metadata信息和一个指向queue的owner node的指针。
RabbitMQ集群会始终同步四种类型的内部元数据(类似索引): a.队列元数据:队列名称和它的属性; b.交换器元数据:交换器名称、类型和属性; c.绑定元数据:一张简单的表格展示了如何将消息路由到队列; d.vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性; 因此,当用户访问其中任何一个RabbitMQ节点时,通过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。
想要实现HA方案,那将RabbitMQ集群中的所有Queue的完整数据在所有节点上都保存一份不就可以了么?(可以类似MySQL的主主模式嘛)这样子,任何一个节点出现故障或者宕机不可用时,那么使用者的客户端只要能连接至其他节点能够照常完成消息的发布和订阅嘛。
设计主要还是基于集群本身的性能和存储空间上来考虑。
第一,存储空间,如果每个集群节点都拥有所有Queue的完全数据拷贝,那么每个节点的存储空间会非常大,集群的消息积压能力会非常弱(无法通过集群节点的扩容提高消息积压能力);
第二,性能,消息的发布者需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加。
RabbitMQ集群的工作原理图如下:
场景1:客户端直接连接队列所在节点
如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接至节点1进行消息的发布或者订阅,那么此时的集群中的消息收发只与节点1相关,这个没有任何问题;如果客户端相连的是节点2或者节点3(队列1数据不在该节点上),那么情况又会是怎么样呢?
场景2:客户端连接的是非队列数据所在节点
如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是上文提到的:指向queue的owner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上。
同样,如果消息消费者所连接的节点2或者节点3,那这两个节点也会作为路由节点起到转发作用,将会从节点1的队列1中拉取消息进行消费。
在此提个问题:如果客户端连接的是非队列数据所在节点,但是队列所在节点挂掉了,情况如何?接着往下看,用到了镜像队列
RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
队列具备两种模式:default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
对应的Policy设置方式为:
rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-to queue
惰性队列和普通队列相比,只有很小的内存开销。这里很难对每种情况给出一个具体的数值,但是我们可以类比一下:当发送1千万条消息,每条消息的大小为1KB,并且此时没有任何的消费者,那么普通队列会消耗1.2GB的内存,而惰性队列只消耗1.5MB的内存。
据官网测试数据显示,对于普通队列,如果要发送1千万条消息,需要耗费801秒,平均发送速度约为13000条/秒。如果使用惰性队列,那么发送同样多的消息时,耗时是421秒,平均发送速度约为24000条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。如果有消费者消费时,惰性队列会耗费将近40MB的空间来发送消息,对于一个消费者的情况,平均的消费速度约为14000条/秒。
如果要将普通队列转变为惰性队列,那么我们需要忍受同样的性能损耗。当转变为惰性队列的时候,首先需要将缓存中的消息换页至磁盘中,然后才能接收新的消息。反之,当将一个惰性队列转变为普通队列的时候,和恢复一个队列执行同样的操作,会将磁盘中的消息批量的导入到内存中。
假想概述:
如果RabbitMQ集群只有一个broker节点,那么在该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非持久化message存储于非持久化queue中的时候)。当然可以将所有的publish的message都设置为持久化的,并且使用持久化的queue,但是这样仍然无法避免由于缓存导致的问题:因为message在发送之后和被写入磁盘并执行fsync之间存在一个虽然短暂但是会产生问题的时间窗。通过publisher的confirm机制能够确保客户端知道哪些message已经存入磁盘,尽管如从,一般不希望遇到因单点故障导致的服务不可用。
如果rabbitmq集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意: 尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的message却不行,这是因为queue及其内容仅仅储存于单个节点之上,所以一个节点的失效表现为其对应的queue不可用。
引入Rabbitmq的镜像队列机制,将queue镜像到cluster中其他的节点之上。在该实现下,如果其中的一个节点失效了,queue能自动地切换到镜像中的另一个节点以保证服务的可用性。在通常的用法中,针对每一个镜像队列都包含一个master和多个slave,分别对用于不同的节点。Slave会准确地按照master执行命令的顺序进行命令执行,故slave与master上维护的状态应该是相同的。除了publish外所有动作都之后会向master发送,然后由master将命令执行的结果广播给slave们,故看似从镜像队列中消费操作实际上是在master上执行的的。
一旦完成了选中的slave被提升为master的动作,发送到镜像队列的message将不会在丢失:publish到镜像队列的所有消息总是被直接publish到master和所有的slave之上。这样一旦master失效了,message仍然可以继续发送到其他slave上。
Rabbitmq的镜像队列同时支持publisher, confirm和事物两种机制。在事物机制中,只有当前事物在全部镜像queue中执行之后,客户端才会收到Tx.CommitOK的消息。同样的,在publisher confirm机制中,向publish进行当前message确认的前提是该messsage被全部镜像所接受了
镜像队列设置:
镜像队列的配置通过添加policy完成,policy添加的命令为:
1.rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级
例如,对队列名称以“queue_”开头的所有队列进行镜像,并在集群的两个节点上完成进行,policy的设置命令为:
1 rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
也可以通过RabbitMQ的web管理界面设置:
普通MQ的结构:
通常队列由两部分组成:一部分是AMQQueue,负责AMQP协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调用,完成消息的存储以及可能的持久化工作等。
在RabbitMQ中BackingQueue又由5个子队列组成:Q1, Q2, Delta, Q3和Q4。RabbitMQ中的消息一旦进入队列,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的不断发生变化。与这5个子队列对于,在BackingQueue中消息的生命周期分为4个状态:
Alpha:消息的内容和消息索引都在RAM中。Q1和Q4的状态。
Beta:消息的内容保存在DISK上,消息索引保存在RAM中。Q2和Q3的状态。
Gamma:消息内容保存在DISK上,消息索引在DISK和RAM都有。Q2和Q3的状态。
Delta:消息内容和索引都在DISK上。Delta的状态。
注意:对于持久化的消息,消息内容和消息所有都必须先保存在DISK上,才会处于上述状态中的一种,而Gamma状态的消息是只有持久化的消息才会有的状态。
上述就是RabbitMQ的多层队列结构的设计,我们可以看出从Q1到Q4,基本经历RAM->DISK->RAM这样的过程。这样设计的好处是:当队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,当负载降低的时候,这部分消息又渐渐回到内存,被消费者获取,使得整个队列具有很好的弹性。下面我们就来看一下,整个消息队列的工作流程。
引起消息流动主要有两方面因素:其一是消费者获取消息;其二是由于内存不足引起消息换出到磁盘。RabbitMQ在系统运行时会根据消息传输的速度计算一个当前内存中能够保存的最大消息数量(Target_RAM_Count),当内存中的消息数量大于该值时,就会引起消息的流动。进入队列的消息,一般会按照Q1->Q2->Delta->Q3->Q4的顺序进行流动,但是并不是每条消息都一定会经历所有的状态,这个取决于当前系统的负载状况。
当消费者获取消息时,首先会从Q4队列中获取消息,如果Q4获取成功,则返回。如果Q4为空,则尝试从Q3获取消息,首先系统会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息(后续会论证)。如果不为空,则取出Q3的消息,然后判断此时Q3和Delta队列的长度,如果都为空,则可认为Q2、Delta、Q3、Q4全部为空(后续会论证),此时将Q1中消息直接转移到Q4中,下次直接从Q4中获取消息。如果Q3为空,Delta不为空,则将Delta转移到Q3中,如果Q3不为空,则直接下次从Q3中获取消息。在将Delta转移到Q3的过程中,RabbitMQ是按照索引分段读取的,首先读取某一段,直到读到的消息非空为止,然后判断读取的消息个数与Delta中的消息个数是否相等,如果相等,则断定此时Delta中已无消息,则直接将Q2和刚读到的消息一并放入Q3中。如果不相等,则仅将此次读取到的消息转移到Q3。这就是消费者引起的消息流动过程。
消息换出的条件是内存中保存的消息数量+等待ACK的消息的数量>Target_RAM_Count。当条件出发时,系统首先会判断如果当前进入等待ACK的消息的速度大于进入队列的消息的速度时,会先处理等待ACK的消息。
最后我们来分析一下前面遗留的两个问题,一个是为什么Q3队列为空即可以认定整个队列为空。试想如果Q3为空,Delta不空,则在Q3取出最后一条消息时,Delta上的消息就会被转移到Q3上,Q3空矛盾。如果Q2不空,则在Q3取出最后一条消息,如果Delta为空,则会将Q2的消息并入到Q3,与Q3为空矛盾。如果Q1不为空,则在Q3取出最后一条消息,如果Delta和Q3均为空时,则将Q1的消息转移到Q4中,与Q4为空矛盾。这也解释了另外一个问题,即为什么Q3和Delta为空,Q2就为空。
通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢的消息极可能只会有Alpha状态。对于durable=true的消息,它一定会进入gamma状态,若开启publish confirm机制,只有到了这个阶段才会确认该消息已经被接受,若消息消费速度足够快,内存也充足,这些消息也不会继续走到下一状态。
通常在系统负载较高时,已接受到的消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,增加处理每个消息的平均开销。因为要花更多的时间和资源处理“积压”的消息,所以用于处理新来的消息的能力就会降低,使得后来的消息又被积压进入很深的队列,继续加大处理每个消息的平均开销,这样情况就会越来越恶化,使得系统的处理能力大大降低。
4.2.2.1 整体介绍
通常队列由两部分组成:一部分是amqqueue_process,负责协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是backing_queue,它提供了相关的接口供amqqueue_process调用,完成消息的存储以及可能的持久化工作等。
镜像队列同样由这两部分组成,amqqueue_process仍旧进行协议相关的消息处理,backing_queue则是由master节点和slave节点组成的一个特殊的backing_queue。master节点和slave节点都由一组进程组成,一个负责消息广播的gm,一个负责对gm收到的广播消息进行回调处理。在master节点上回调处理是coordinator(协调员),在slave节点上则是mirror_queue_slave。mirror_queue_slave中包含了普通的backing_queue进行消息的存储,master节点中backing_queue包含在mirror_queue_master中由amqqueue_process进行调用。
注意:消息的发布与消费都是通过master节点完成。master节点对消息进行处理的同时将消息的处理动作通过gm广播给所有的slave节点,slave节点的gm收到消息后,通过回调交由mirror_queue_slave进行实际的处理。
4.2.2.2 gm(Guaranteed Multicast)
传统的主从复制方式:由master节点负责向所有slave节点发送需要复制的消息,在复制过程中,如果有slave节点出现异常,master节点需要作出相应的处理;如果是master节点本身出现问题,那么slave节点间可能会进行通信决定本次复制是否继续。当然为了处
理各种异常情况,整个过程中的日志记录是免不了的。然而rabbitmq中并没有采用这种方式,而是将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有节点。
在master节点和slave节点上的这些gm形成一个group,group的信息会记录在mnesia中。不同的镜像队列形成不同的group。消息从master节点对应的gm发出后,顺着链表依次传送到所有节点,由于所有节点组成一个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知道消息已经复制到所有slave节点了。
4.2.2.3 重要的表结构
rabbit_queue表记录队列的相关信息:
-record(q, {q, %% 队列信息数据结构amqqueue
exclusive_consumer, %% 当前队列的独有消费者
has_had_consumers, %% 当前队列中是否有消费者的标识
backing_queue, %% backing_queue对应的模块名字
backing_queue_state, %% backing_queue对应的状态结构
consumers, %% 消费者存储的优先级队列
expires, %% 当前队列未使用就删除自己的时间
sync_timer_ref, %% 同步confirm的定时器,当前队列大部分接收一次消息就要确保当前定时器的存在(200ms的定时器)
rate_timer_ref, %% 队列中消息进入和出去的速率定时器
expiry_timer_ref, %% 队列中未使用就删除自己的定时器
stats_timer, %% 向rabbit_event发布信息的数据结构状态字段
msg_id_to_channel, %% 当前队列进程中等待confirm的消息gb_trees结构,里面的结构是Key:MsgId Value:{SenderPid, MsgSeqNo}
ttl, %% 队列中设置的消息存在的时间
ttl_timer_ref, %% 队列中消息存在的定时器
ttl_timer_expiry, %% 当前队列头部消息的过期时间点senders, %% 向当前队列发送消息的rabbit_channel进程列表
dlx, %% 死亡消息要发送的exchange交换机(通过队列声明的参数或者policy接口来设置)
dlx_routing_key, %% 死亡消息要发送的路由规则(通过队列声明的参数或者policy接口来设置)
max_length, %% 当前队列中消息的最大上限(通过队列声明的参数或者policy接口来设置)
max_bytes, %% 队列中消息内容占的最大空间
args_policy_version, %% 当前队列中参数设置对应的版本号,每设置一次都会将版本号加一
status %% 当前队列的状态
}).
注意:slave_pids的存储是按照slave加入的时间来排序的,以便master节点失效时,提升
gm_group表记录gm形成的group的相关信息:%% 整个镜像队列群组的信息,该信息会存储到Mnesia数据库
-record(gm_group, { name, %% group的名称,与queue的名称一致
version, %% group的版本号, 新增节点/节点失效时会递增
members %% group的成员列表, 按照节点组成的链表顺序进行排序
}).
view_member记录当前GM群组中真实的视图的数据结构:
%% 镜像队列群组视图成员数据结构
-record(view_member, { id, %% 单个镜像队列(结构是{版本号,
该镜像队列的Pid})
aliases, %% 记录id对应的左侧死亡的GM进程列表
left, %% 当前镜像队列左边的镜像队列(结构是{版本号,该镜像队列的Pid})
right %% 当前镜像队列右边的镜像队列(结构是{版本号,该镜像队列的Pid})
}).
记录单个GM进程中信息的数据结构:
%% 记录单个GM进程中信息的数据结构
-record(member, { pending_ack, %% 待确认的消息,也就是已发布的消息缓存的地方
last_pub, %% 最后一次发布的消息计数
last_ack %% 最后一次确认的消息计数
}).
当消息大量堆积的时候,首先大量堆积就以为大量的消息都得不到及时处理,时延问题可以忽略;其次,也不用考虑预取个数设置的过大而造成的其他消费者空闲,因为大家都一直都有消息在流入处理。消息堆积可能会引起换页而性能下降,或者更槽糕的是触发内存告警而阻塞所有的生产者。
消息堆积治理方案如下:
丢弃策略: 设置消息保留时间或者保留大小(可以是个数或者占用大小),超时就丢弃
Lazy Queue
“移花接木”
第一种方案是丢弃策略,即当消息超过预定的保留时间以及当前消息堆积的个数或者是内存占用而选择丢弃数据,这个可以类比于kafka的日志保留策略,当然这种情况适合于消息可靠性要求不高、可丢弃的场景。又比如Java线程池的饱和策略中就有一种是丢弃策略。Rabbitmq本身并没有提供相应的配置和功能,这个需要外部平台的包装。
第二种方案是前面所提及的惰性队列,完全采用磁盘的空间来极大的增加堆积的能力(一般情况下,一条服务器的磁盘容量比内存容量要大得多的多)
第三种方案,我们把它称之为“移花接木”,下面看一张图,
当某个队列中的消息严重堆积时,举例:当前运行的集群cluster1中队列queue1的消息个数超过2kw或者占用内存大小超过10GB,就可以启用shovel1将队列queue1中的消息转发至备份集群cluster2中的队列queue2中,这样可以分摊堆积的压力;当检测到队列queue1中的消息个数低于100w或者消息占用大小低于1GB时就停止shovel1,然后让原本队列queue1中的消费者慢慢处理剩余的堆积;当检测到队列queue1中的消息个数低于100w或者消息占用大小低于1GB时就开启shovel2将队列queue2中暂存的消息返还给队列queue1;当检测到队列queue1中的消息个数超过100w或者消息占用大小超过1GB时就将shovel2停掉,经过一个周期之后,再开启shovel2,超过阀值时就停掉,如此反复多次直到将队列queue2中的消息清空为止。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。