赞
踩
SpringBoot 整合 RabbitMQ 实现五种消息模型
SpringBoot 整合 RabbitMQ 实现五种消息模型源码
MQ 是消息通信的模型,并发具体实现。现在实现 MQ 的有两种主流方式:AMQP、JMS。
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。更多介绍
JMS,即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。更多介绍
两者间的区别和联系:
常见 MQ 产品
RabbitMQ 是基于 AMQP 的一款消息管理系统,RabbitMQ 基于 Erlang 语言开发,安装之前需要先安装 Erlang 的相关依赖。
官网: http://www.rabbitmq.com/
官方教程:http://www.rabbitmq.com/getstarted.html
官网下载地址:http://www.rabbitmq.com/download.html
Erlang 下载地址:http://www.erlang.org/download.html
RabbitMQ 在 Windows 的安装教程:https://blog.csdn.net/weixin_39735923/article/details/79288578
RabbitMQ 提供了 6 种消息模型,但是第 6 种其实是 RPC,并不是 MQ,因此不予学习。那么也就剩下 5 种。
但是其实 3、4、5 这三种都属于订阅模型,只不过进行路由的方式不同。
官方文档说明:
RabbitMQ 是一个消息的代理者(Message Broker):它接收消息并且传递消息。
你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。
不同之处在于:RabbitMQ 不是传递纸质邮件,而是二进制的数据
基本消息模型图:
在上图的模型中,有以下概念:
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ 怎么知道消息被接收了呢?
这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执 ACK 分两种情况:
大家觉得哪种更好呢?
这需要看消息的重要性:
手动 ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为 true,则会自动进行 ACK;如果为 false,则需要手动 ACK。
在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
在广播模式下,消息发送流程是这样的:
要注意代码中:**队列需要和交换机绑定
在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange。
在 Direct 模型下:
RoutingKey
(路由 key)RoutingKey
。Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息流程图:
图解:
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好 1 个词
举例:
audit.#
:能够匹配audit.irs.corporate
或者 audit.irs
audit.*
:只能匹配audit.irs
图示:
解释:
usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配如何避免消息丢失?
1) 消费者的 ACK 机制。可以防止消费者丢失消息。
2) 但是,如果在消费者消费之前,MQ 就宕机了,消息就没了。
是可以将消息进行持久化呢?
要将消息持久化,前提是:队列、Exchange 都持久化
交换机持久化
// 声明exchange,指定类型为topic,true表示持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
队列持久化
// 声明队列,第二个参数true表示持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
消息持久化
// 发送消息,第三个参数设置持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。