当前位置:   article > 正文

【SpringBoot学习】12、RabbitMQ 五种消息模型概念_springboot rabbitmq五种消息模型

springboot rabbitmq五种消息模型

AMQP 和 JMS

SpringBoot 整合 RabbitMQ 实现五种消息模型

SpringBoot 整合 RabbitMQ 实现五种消息模型源码

MQ 是消息通信的模型,并发具体实现。现在实现 MQ 的有两种主流方式:AMQP、JMS。

AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。更多介绍

JMS,即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。更多介绍

两者间的区别和联系:

  • JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式
  • JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
  • JMS 规定了两种消息模型;而 AMQP 的消息模型更加丰富

常见 MQ 产品

  • ActiveMQ:基于 JMS
  • RabbitMQ:基于 AMQP 协议,erlang 语言开发,稳定性好
  • RocketMQ:基于 JMS,阿里巴巴产品,目前交由 Apache 基金会
  • Kafka:分布式消息系统,高吞吐量

RabbitMQ

RabbitMQ 是基于 AMQP 的一款消息管理系统,RabbitMQ 基于 Erlang 语言开发,安装之前需要先安装 Erlang 的相关依赖。

官网: http://www.rabbitmq.com/
官方教程:http://www.rabbitmq.com/getstarted.html
官网下载地址:http://www.rabbitmq.com/download.html
Erlang 下载地址:http://www.erlang.org/download.html
RabbitMQ 在 Windows 的安装教程:https://blog.csdn.net/weixin_39735923/article/details/79288578

五种消息模型

RabbitMQ 提供了 6 种消息模型,但是第 6 种其实是 RPC,并不是 MQ,因此不予学习。那么也就剩下 5 种。

但是其实 3、4、5 这三种都属于订阅模型,只不过进行路由的方式不同。
在这里插入图片描述

一、基本消息模型

官方文档说明:

RabbitMQ 是一个消息的代理者(Message Broker):它接收消息并且传递消息。

你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。

不同之处在于:RabbitMQ 不是传递纸质邮件,而是二进制的数据

基本消息模型图:
在这里插入图片描述

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
消费者的消息确认机制(Acknowlage)

通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ 怎么知道消息被接收了呢?

这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执 ACK 分两种情况:

  • 自动 ACK:消息一旦被接收,消费者自动发送 ACK
  • 手动 ACK:消息接收后,不会发送 ACK,需要手动调用

大家觉得哪种更好呢?

这需要看消息的重要性:

  • 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便
  • 如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

手动 ACK

channel.basicConsume(QUEUE_NAME, false, consumer);
  • 1

如果第二个参数为 true,则会自动进行 ACK;如果为 false,则需要手动 ACK。

二、work 消息模型

在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。

Work queues,也被称为(Task queues),任务模型。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

在这里插入图片描述
角色:

  • P:生产者:任务的发布者
  • C1:消费者,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者 2:领取任务并完成任务,假设完成速度快
能者多劳
  • 消费者 1 比消费者 2 的效率要低,一次任务的耗时较长
  • 然而两人最终消费的消息数量是一样的
  • 消费者 2 大量时间处于空闲状态,消费者 1 一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
  • 1
  • 2

三、订阅模型-Fanout

在广播模式下,消息发送流程是这样的:
在这里插入图片描述

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的 queue(队列)
  • 3) 每个队列都要绑定到 Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

要注意代码中:**队列需要和交换机绑定

四、订阅模型-Direct

在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange。

在 Direct 模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由 key)
  • 消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

流程图:
在这里插入图片描述
图解:

  • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息

五、订阅模型-Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词
*:匹配不多不少恰好 1 个词

举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs

图示:
在这里插入图片描述
解释:

  • 红色 Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
  • 黄色 Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

持久化

如何避免消息丢失?

1) 消费者的 ACK 机制。可以防止消费者丢失消息。
2) 但是,如果在消费者消费之前,MQ 就宕机了,消息就没了。

是可以将消息进行持久化呢?

要将消息持久化,前提是:队列、Exchange 都持久化

交换机持久化

// 声明exchange,指定类型为topic,true表示持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  • 1
  • 2

队列持久化

// 声明队列,第二个参数true表示持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 1
  • 2

消息持久化

// 发送消息,第三个参数设置持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 1
  • 2

微信公众号

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/582164
推荐阅读
相关标签
  

闽ICP备14008679号