1、消息队列种类繁多,例如RabbitMQ、ActiveMQ、ZeroMQ、Kafka等,消息队列的使用场景主要有:异步处理、应用解耦、流量消峰及消息通讯等。
RabbitMQ是采用Erlang语言、基于AMQP(Advanced Message Queue Protocol 高级消息队列协议)的开源实现。
2、RabbitMQ是一个消息代理,它的工作就是接收和发送消息。
消息生产者Producer:发送消息至消息队列。
消息消费者Consumer:从消息队列接收消息。
消息队列Queue:一个先进先出FIFO的消息存储区域。消息按照顺序发送接收,一旦消息被消费处理,该消息将从队列中删除。
主题Topic:支持消息多个订阅者的机制。
生产者、消费者和RabbitMQ可以部署至不同的机器,底层通讯默认使用的是TCP Socket连接。
3.点对点/队列消息模型(Queue)
生产者向一个特定的队列发送消息,消费者从该队列中接收消息;消息的生产者和消费者可以不同时处于运行状态,即点对点/队列消息模型支持消息的持久化。每一个成功处理的消息都由消息消费者签收确认(Acknowledge)。
4.发布订阅消息模型(Topic)
发布订阅模型,支持向一个特定的消息主题Topic发布消息,0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方的存在。但消息接收在发布者和订阅者之间存在时间依赖性,即必须先订阅,再发送消息,然后接收订阅的消息,此顺序必须保证。
5.消息的自动确认(ACK)机制
为了保证消息不丢失,RabbitMQ提供了消息确认Acknowledge机制,即ACK机制,RabbitMQ提供两种确认机制:自动确认和手工确认。
对于自动确认机制,如果消费者从队列中获取到消息,那么RabbitMQ 会自动将该消息标记为已确认,从队列中删除该消息。假如消费者在处理消息过程中宕机或异常关闭,那么该消息将会丢失。
对于手工确认机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息。如果Consumer宕机/关闭,没有发送ACK,消息队列认为这个消息没有被处理,会将这个消息重新发给其他的Consumer重新消费处理。
因此,消息的ACK机制是面向消息的消费而言。如果消息未ACK,只有在当前Consumer关闭后,其他的Consumer才能继续消费这个消息。
ACK机制:
AutoAcknowledge:消息自动确认
IndividualAcknowledge:消息显式逐条确认
6.RabbitMQ 基本概念
打开RabbitMQ监控管理界面,可以看到RabbitMQ由很多不同的部分组成:
Connection:连接,位于客户端和Broker之间的TCP连接
Channel:信道,仅仅创建了客户端和Broker之间的连接后,客户端还不能发送消息。需要为每一个Connection创建Channel,AMQP协议只有通过 Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。
Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。Exchange类型有direct、Fanout和Topic三种,不同类型的Exchange路由的行为不同。其中发布订阅模型设定的Exchange Type即为Topic,点对点的队列Queue的Exchange Type为direct。
Queue:消息队列,用于存储还未被消费者消费的消息。
Broker:MQ节点,接受客户端连接,实现AMQP消息队列和路由功能的进程。
Binding:联系Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即RoutingKey。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在绑定Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Virtual Host:是一个虚拟概念,类似于权限控制组,一个virtual host里面可以 若干个Exchange和Queue,权限控制的最小力度是virtual host.
7.RabbitMQ的简单执行命令
RabbitMQ不但提供了监控管理界面,并且提供了完善的执行命令,用来查看其运行状态及操作,熟练的使用命令,是快速查看RabbitMQ执行情况的良好方式。在RabbitMQ安装完成后,从开始菜单--》RabbitMQ Server-》RabbitMQ Command Prompt (sbin dir) 打开控制台
rabbitmqctl status:查看服务器状态
rabbitmqctl list_queues:查看队列信息
rabbitmqctl list_exchanges:查看exchange信息
rabbitmqctl list_bindings:查看binding信息
rabbitmqctl list_connections:查看连接信息
rabbitmqctl list_channels :查看channel信息
rabbitmqctl add_user userName password
rabbitmqctl change_password userName newPassword
rabbitMQctl join_cluster rabbitMQ_Node_name:将某个节点添加至集群
rabbitmqctl forget_cluster rabbitMQ_Node_Name:将某个节点从集群中移除
附简单的技术问答题:
问题1:RabbitMQ基于什么协议?用哪种语言开发?AMQP的全称是什么?
答案:RabbitMQ基于AMQP协议,采用Erlang语言开发。AMQP是Advanced Message Queue Protocol,即高级消息队列协议。
问题2:
消息应用中心根据任务元数据创建的队列名称为什么会包含Ha前缀?
答案: 为了在多个Broker之间进行消息的镜像同步
问题3:生产者发送消息到RabbitMQ,其实是发送消息至Exchange,Exchange根据什么决定消息转发至具体某个或某些队列?
答案:Exchange根据Routing Key决定消息路由到哪个或哪些队列。
另外:RoutingKey:指定当前消息被谁接收
BindingKey:指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue
问题4:RabbitMQ的ACK有几类?
答案:AutoAcknowledge:消息自动确认
IndividualAcknowledge:消息显式逐条确认
问题5:RabbitMQ Exchange Type分为几类?
答案:direct、fanout、topic、headers
问题6:RabbitMQ发布订阅必须保证操作顺序,具体如何操作?
答案:先订阅,再发送消息,而后接收订阅的消息,该顺序必须保证。
问题7:RabbitMQ新增用户的命令是什么?
答案:rabbitmqctl add_user UserName password
问题8:不特殊指定的发布订阅模式,在订阅者断开以后,对应的订阅Queue是否存在?
答案:发布订阅模式,如果Consumer由于某种原因与MQ断开连接,那么MQ会自动将与该Consumer相关的订阅Queue删除。也就意味着,consumer断开以后,再重新连接,订阅队列会改变,即不特殊指定的情况下,发布订阅模式消息不会持久化。
问题9.如果某个Message Queue存在消息积压,如何判定是由于什么原因造成?
答案:
1.判定该Message Queue是否存在对应的consumer,如果无consumer,需要判定该Message Queue对应的消费程序是否开启。
2.如果存在consumer,需要判定该Message Queue的unAcked 的消息个数,如果该值过大,可能原因是发送端TPM增大或消费端处理速度变慢。
3.查看该Message Queue 发送端TPM是否有明显增大,如果是,那么可以认定发送TPM增大是其中一个原因。
4.查看Message Queue 消费的最大响应时间,如果响应时间较大,那么消费消息TPM减小是产生消息的一个原因。要查找具体的链路信息,分析原因。
5.查看MQ 与Consumer 的网络是否正常。
RabbitMQ的内部机制非常复杂,虽然不容易出现问题,但偶尔也会发小脾气,因此了解这位白娘子也是非常必要且必须的。