赞
踩
RabbitMq用Erlang语言开发的基于AMQP协议开源实现的
AMQP全称Advanced Message Queue,高级消息队列协议,它是应用层协议的一个开发标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件,不受产品和开发语言的条件限制,
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性,扩展性,高可用性等方面有表现不俗表现
1.异步处理:把数据当作消息是放入消息中间将中,等需要的时候再去处理
2.并发业务:如电商的抢购业务,极短时间内大量的请求访问到接口.从而造成系统负载严重,像这类业务使用消息队列,当存入消息队列的消息满了,就拒绝后续的请求,使其让请求不进入后面的业务代码
3.耗时长的业务:如商城生成订单,生成订单牵涉的业务量多,从而业务处理生成时间比较缓慢,但是订单的生成又没有实时性的要求,就可以使用MQ进行异步处理
4.耦合度高的业务: 一个服务的业务代码需要增加一些新的需求,而且所需要牵涉的业务模块比较多,如果在在一个服务增加这些需求代码,难免代码过于耦合,此场景就可使用MQ解偶
AMQP高级消息队列协议,定义:Advanced Message Queueing Protocol是面向消息的中间件的开放标准应用层协议,AMQP的特征是消息导向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。
AMQP要求消息传递提供商和客户端的行为在不同供应商实现可互操作的情况下,以与SMTP,HTTP,FTP等相同的方式创建了可互操作的系统。
AMQP协议是具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
AMQP是一种二进制应用层协议,旨在有效地支持各种消息应用和通信模式。
AMQP协议图
Server
又称作Broker,用于接受客户端的连接,实现AMQP实体服务;
Connection
连接,应用程序与Broker的网络连接;
Channel
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务;
Message
消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容,即我们要传输的数据;
仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
Virtual Host
虚拟地址,是一个逻辑概念,用于进行逻辑隔离,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
Virtual Host是权限控制的最小粒度;
Exchange
交换机,用于接收消息,可根据路由键将消息转发到绑定的队列;
Binding:
Exchange和Queue之间的虚拟连接,Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定
Routing Key:
一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;
Queue:
也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者;
RabbitMQ整体架构图
流程图
发送消息流程
1.生产者连接到Broker 建立一个连接,然后开启一个信道
2.接着生产者声明一个交换器 ,并设置相关属性,比如交换机类型、是否持久化、是否自动删除、是否内置等
3.生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除、消息最大过期时间、消息最大长度、消息最大字节数等
4.生产者通过路由键将交换器和队列绑定起来
5.生产者发送消息至Broker ,发送的消息包含消息体和含有路由键、交换器、优先级、是否持久化、过期时间、延时时间等信息的标签
6.相应的交换器根据接收到的路由键查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中
7.如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者
8.关闭信道
9.关闭连接
接收消息流程
1.消费者连接到Broker ,建立一个连接,开启一个信道
2.消费者向 RabbitMQ Broker 请求消费相应队列中的消息,在这个过程中可能会设置消费者标签、是否自动确认、是否排他等
3.等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
4.消费者确认接收到的消息
5.RabbitMQ从队列中删除相应己经被确认的消息
6.关闭信道
7.关闭连接。
1.fanout模式(默认使用)
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
图中生产者发送消息到Exchange,都会路由带图中两个Queue,最终被两个消费者消费
2.direct模式
dirct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与 routing key完全匹配的Queue中,如图,生产routung key=key1的时候,只绑定key1的queue才能收到消息
3.topic模式
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“image.new.profile”.
binding key与routing key一样也是句点号“. ”分隔的字符串
binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配下一个据点前的所有字符,“#”用于匹配所有字符,包括句点(可以是零个)
如图,生产者以routing key为image.new.profile发布消息,这key可以被image.*.profile以及image.#匹配到,所有这两个队列都可以收到消息。由此可见,topic的路由方式更加灵活。
4.headers模式
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。
RabbitMQ完全实现AMQP协议,类型邮箱功能,Exchange负责Exchange type和Routing key 将消息投递到对应的消息队列中,消息队列负责在消费者获取消息前暂存消息,在Rabbit MQ中 messageQueue主要由两部分组成,一个AMQQueue,主要负责实现AMQP协议的逻辑功能,另外一个是用来存储消息的BackingQuerue
为了高效处理入队和出队消息,避免不必要的磁盘IO,BackingQueue进程为消息设计了4种状态和5个内部队列
1).4种状态包括
1.alpha.消息的内容和索引都在内存中
2.beta.消息内容在磁盘,索引在内存
3.gamma.消息的内容在磁盘,索引在磁盘和内存中都有
4.delta.消息的内容和索引都在磁盘
对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,然后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。
(2) 5个内部队列
包括:q1、q2、delta、q3、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。所以delta队列并不在内存中,其他4个队列则是由erlang queue模块实现。
消息从q1入队,q4出队,在内部队列中传递的过程一般是经q1顺序到q4。实际执行并非必然如此:开始时所有队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。
当内存紧张时触发paging,paging将大量alpha状态的消息转换为beta和gamma;如果内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,所以Paging的开销较大,严重影响系统性能。
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
我们将消息持久化后,假如消费端出现异常,rabbitmq服务器会将消息缓存到内存,当生产者发送一直发送消息而消费者都没有正常消费时消息就会将这些消息全部保存在内存,当我们的消息过多时,就可能导致rabbitmq服务器内存泄漏
解决办法:1.开启ack确认机制,2.消费端设置重试机制(默认是三次)
开启之后生产者客户端没有收到消费者的消息ack反馈就会认为该消息没有被消费,就会保存到内存,但是设置重试机制后,消费端消费了设置的重试次数之后,假如还没有收到ack反馈,就会将缓存在内存中的消息移除,这样就可以防止内存溢出
实际场景中,有的生产者发送消息必须保证成功发送到消息队列中,那么如何保证投递呢,RabbitMQ 提供了两种方式事务方式, AMQP的事务机制实现和发布确认(confirm)
AMQP的事务机制
由amqp协议提供的一种保证消息成功投递的方式
通过将信道开启transactional模式
并利用信道Channel的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
channel.txSelect(): 开启事务
channel.txCommit() :提交事务
channel.txRollback() :回滚事务
此种方式需要开启tcp连接,tcp连接的三次握手,4次挥手,需要使用大量资源,导致开此事务和不开启事务的方式性能要慢10倍
发布确认(confirm)
发布确认默认协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式是,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,brokey就会发送一个确认给生产者(包含消息的唯一id),这就使得生产者知道消息正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
rabbitmq的发送确认方式,也是通过信道开启的
而确认模式又分为同步等待mq服务起确认和异步等待确认两种
同步等待 channel.waitForConfirms()
异步监听等待 channel.addConfirmListener(listener)
发布确认 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。
事务等待发布消息,性能太差,往往不采用同步等待的方式发布消息,建议采用异步发送确认的方式
本篇博客主要是理论知识,没有验证代码
有兴趣的小伙伴,可以加微信技术交流群,遇到问题可以一起讨论
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。