赞
踩
RabbitMq一共有6种消息模型,分别为:
1.基本模型(Hello World)
2.消息分发(工作队列)模型(Work queues)
3.Fanout订阅模型-消息订阅模式(Publish/Subscribe)
4.Direct订阅模型-路由模式(Routing)
5.Topic订阅模型-匹配模式(Topic)
6.RPC模式(RPC)
而其中的RPC模式,它并不完全属于消息队列(MQ)的定义,而是使用MQ的某些功能来实现RPC的效果。
所以简单的介绍下以下五种消息模型:
1.这是个非常简单的模型,即一个生产者发送消息到一个队列,一个消费者从该队列中接收并处理消息。
2.主要适用于RabbitMq的测试和日常我们学习的场景,在需要快速搭建轻量级消息队列系统的应用中,简单模式也是个很好的选择。
代码实现:
在RabbitMq的webUI界面去创建,为了方便我直接在RabbitMq管理页面创建一个名为basic1的队列(我们也可以在Java代码中去配置queue,交换机及其绑定,为了方便展示用了管理页面)
用postMan请求,生产者发送消息
- package com.org.product.controller;
-
- import com.org.product.config.MqProductCallBack;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.MessageDeliveryMode;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
-
- /**
- * Created with IntelliJ IDEA.
- * @Author: 你的名字
- */
- @RestController
- @RequestMapping("/product")
- @Slf4j
- public class PushMessageController {
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Resource
- private MqProductCallBack mqProductCallBack;
-
-
- @GetMapping("/basic")
- public String basic(){
- log.info("-------------消息推送开始--------------");
- //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
- CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
- //消息确认与返回
- rabbitTemplate.setConfirmCallback(mqProductCallBack);
- rabbitTemplate.setReturnsCallback(mqProductCallBack);
- /**
- *消息发送
- *在简单模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
- *也就是routingKey写队列名称
- */
- rabbitTemplate.convertAndSend("basic1","小飞棍来喽!",
- //Lambda表达式,实现MessagePostProcessor接口
- message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;
- }, correlationData);
- log.info("--------------消息推送结束------------------");
- return "消息发送成功了!!!";
- }
- }
控制台
消费者监听
- package com.org.consumer.listener;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * Created with IntelliJ IDEA.
- *
- * @Author: 你的名字
- * @Description:监听消息
- */
- @Component
- public class MessageListener {
-
- @RabbitListener(queues ="basic1")
- public void handleMessage(String message) {
- //处理接收到的消息
- System.out.println("接收到的消息是:" + message);
- }
-
-
- //手动ack
- // @RabbitListener(queues = "queue2")
- // public void handleMessage1(String message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tag) throws IOException {
- // System.out.println("接收到的消息是:" + message);
- // try {
- // Thread.sleep(2000);
- // //发生异常
- // int i = 1/0;
- // //手动ack
- // channel.basicAck(tag,false);
- // } catch (Exception e) {
- // //手动ack,让消息重回队列,参数三表示是否重回队列
- // channel.basicNack(tag,false,false);
- //
- // }
- //
- // }
- }
控制台
1.工作队列将一些耗时的任务封装为消息,并将其发送到一个中心队列,多个消费者同时从队列中获取任务,每个任务只会被一个消费者获取并处理。
2.工作队列模型主要适用于长时间运行的任务场景,例如订单系统,当单个订单的处理时间较长(比如15秒),我们可以将多个订单同时放入消息队列中,让多个消费者同时处理这些订单,从而实现并行处理而不是串行处理;还可以实现需要异步处理任务的场景,如更新数据库等。
代码实现:
和上边一样我们先创建名为work1的队列
生产者代码
- @GetMapping("/work")
- public String work(){
- log.info("-------------消息推送开始--------------");
- //发送10条消息测试
- for (int i = 1; i <= 10; i++) {
- //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
- CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
- //消息确认与返回
- rabbitTemplate.setConfirmCallback(mqProductCallBack);
- rabbitTemplate.setReturnsCallback(mqProductCallBack);
- /**
- *消息发送
- *在工作队列模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
- *也就是routingKey写队列名称
- */
- rabbitTemplate.convertAndSend("work1","小飞棍来喽"+i+"!",
- //Lambda表达式,实现MessagePostProcessor接口
- message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;
- }, correlationData);
- }
- log.info("--------------消息推送结束------------------");
-
- return "消息发送成功了!!!";
- }
控制台
消费者代码
- @RabbitListener(queues = "work1")
- public void workQueue1(String message){
- System.out.println("消费者--1--接收到的消息是:" + message);
- }
-
- @RabbitListener(queues = "work1")
- public void workQueue2(String message){
- System.out.println("消费者--2--接收到的消息是:" + message);
- }
控制台
1.Fanout订阅模型和工作队列模式比较,前者用到了fanout交换机,并广播到多个队列,消息生产者将消息发送到交换机(Exchange),而交换机以广播的形式将消息发送给所有与该交换机绑定的队列,从而实现了消息的广播。
2.Fanout交换机:主要用来将消息广播给绑定该交换机的队列,不关心消息的路由键是什么,也就是无论消息路由键是什么,Fanout交换机都将消息发送给绑定的队列,与Direct交换机相比,不需要设置路由键就可以将消息广播到所有队列,与Topic交换机相比,它不支持通配符匹配路由键。
3.主要适用于将消息广播给多个消费者,例如:日志记录,实时消息推送等
代码实现:
(1)首先我们在web界面创建名为fanout_queue1,fanout_queue2队列。
(2)在创建Fanout类型交换机,名为fanout_exchange。
(3)绑定队列fanout_queue1,fanout_queue2
路由键Routing key 设不设置都可以,我这边就不设置了
生产者代码
- @GetMapping("/fanout")
- public String fanout() {
- for (int i = 1; i <= 4; i++) {
- //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
- CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
- //消息确认与返回
- rabbitTemplate.setConfirmCallback(mqProductCallBack);
- rabbitTemplate.setReturnsCallback(mqProductCallBack);
- /**
- *消息发送
- *在fanout订阅模式我们可以随意定义路由
- *也就是routingKey写队列名称
- */
- rabbitTemplate.convertAndSend("fanout_exchange", "666", "小飞棍来喽" + i + "!",
- //Lambda表达式,实现MessagePostProcessor接口
- message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;
- }, correlationData);
- }
- return "消息发送成功了!!!";
- }
控制台:4条消息发送成功
消费者代码:定义两个分别监听两个队列的消费者
- //fanout订阅模式
- @RabbitListener(queues = "fanout_queue1")
- public void fanout1(String message){
- System.out.println("fanout_queue1--的消息是:" + message);
- }
-
- @RabbitListener(queues = "fanout_queue2")
- public void fonout2(String message){
- System.out.println("fanout_queue2--的消息是:" + message);
- }
控制台:4条消息,分别广播到两个队列
1.Direct订阅模式用到了Direct交换机,不同于Fanout订阅模式,它必须指定路由键,允许消费者只接收与特定路由键匹配的消息,从而实现了有选择性接收,它的匹配方式是精确匹配,即路由键和绑定键必须完全相同,不同则无法将消息正确路由到对应的队列,消息可能会被丢弃或者根据RabbitMq配置进行处理(如返回给生产者或者进入死信队列)。
2.Direct订阅模式适用于日志处理,订单系统等,比如我们根据订单号将订单消息路由到特定的处理队列中。
代码实现:
(1)我们在RabbitMq的web管理页面创建Direct交换机,名为direct_exchange。
(2)然后创建两个队列,分列为direct_queue1,direct_queue2。(你也可以用之前创建过的)
(3)交换机direct_exchange和direct_queue1的绑定路由键为direct_key1。
交换机direct_exchange与direct_queue2绑定的路由键为direct_key2。
生产者代码:将 "小飞棍来喽A、B、C" 发送到direct_queue2队列,将 "小飞棍来喽A" 发送到direct_queue1队列
- @GetMapping("/direct")
- public String direct() {
- //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
- CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
- //消息确认与返回
- rabbitTemplate.setConfirmCallback(mqProductCallBack);
- rabbitTemplate.setReturnsCallback(mqProductCallBack);
- /**
- *消息发送
- *在direct订阅模式我们要精准路由
- */
- rabbitTemplate.convertAndSend("direct_exchange", "direct_key2", "小飞棍来喽A", message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;},correlationData);
- rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽B",message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;},correlationData);
- rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽C",message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;},correlationData);
- rabbitTemplate.convertAndSend("direct_exchange", "direct_key1", "小飞棍来喽A",message -> {
- //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- //返回修改后的消息
- return message;},correlationData);
- return "消息发送成功了!!!";
- }
控制台:4条消息发送成功
消费者代码:
- //direct订阅模式
- @RabbitListener(queues = "direct_queue1")
- public void direct1(String message){
- System.out.println("direct_queue1--的消息是:" + message);
- }
-
- @RabbitListener(queues = "direct_queue2")
- public void direct2(String message){
- System.out.println("direct_queue2--的消息是:" + message);
- }
控制台:消费者把对应的队列消息消费下来了
1.Topic订阅模型相比Direct订阅模型,分组更细致,也更灵活,他允许在路由键上使用通配符,并根据发送消息时携带的路由键进行模糊匹配。需要注意的是用通配符匹配,单词之间要用(.)分隔。
2.通配符:
*(星号)匹配一个单词:例如,我们代码中配置的路由键是superman.A或superman.B或superman.C那么我们绑定的键(Binding Key)为superman.*的队列将会匹配到这个路由键,但是如果是superman.A.D,就不会匹配到#(井号)匹配零个或者多个单词:例如,路由键是superman.A.D,那么绑定的superman.#的队列会匹配到这个路由键,当然如果是superman.A.D.E等都会匹配到(注意,不建议用#.#它会导致接收到大量的消息)
3.Topic订阅模型适用于根据消息内容或者类别来进行灵活分类。例如分布式任务调度,新闻订阅等。
代码实现:
先在RabbitMq管理页面新建topic_exchange交换机
然后建立两个队列topic_queue1,topic_queue2
最后topic_exchange交换机绑定topic_queue1的路由键为topic.*;绑定topic_queue2的路由键为topic.#
生产者代码
- @GetMapping("/topic")
- public String topic() {
- //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
- CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
- //消息确认与返回
- rabbitTemplate.setConfirmCallback(mqProductCallBack);
- rabbitTemplate.setReturnsCallback(mqProductCallBack);
- /**
- * 发送给topic_queue1
- * 因为topic_queue1我们绑定的路由键是topic.*,所以我们routingKey可以写(topic.任何一个单词)
- */
- rabbitTemplate.convertAndSend("topic_exchange", "topic.A", "小飞棍来喽A!", message -> {
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return message;
- }, correlationData);
- /**
- * 发送给topic_queue2
- * 因为topic_queue2我们绑定的路由键是topic.#,所以我们routingKey可以写(topic.一个或者多个单词)
- */
- rabbitTemplate.convertAndSend("topic_exchange", "topic.B.B.B", "小飞棍来喽B", message -> {
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return message;
- }, correlationData);
-
- return "消息发送成功了!!!";
- }
消费者代码
- //topic订阅模式
- @RabbitListener(queues = "topic_queue1")
- public void topic1(String message){
- System.out.println("topic_queue1--的消息是:" + message);
- }
-
- @RabbitListener(queues = "topic_queue2")
- public void topic2(String message){
- System.out.println("topic_queue2--的消息是:" + message);
- }
消费者控制台
由上边的消费结果可以看出,topic_queue1接收到一条消息,topic_queue2接收到两条消息,
是因为topic.*的路由匹配topic.A,而匹配不到topic.B.B.B,
而topic.#的路由匹配到了topic.A以及topic.B.B.B
以上五种模型涵盖了RabbitMQ的主要消息传递方式,从简单的点到点通信到复杂的发布/订阅模式,可以满足不同应用场景的需求。同时,RabbitMQ还提供了消息确认机制、消息持久化、消息优先级和消息过期时间等特性,以保证消息的可靠性、持久性和灵活性,我们可以根据我们业务的需求场景来决定使用哪一种消息模式。
上边的SpringBoot代码工程在上一篇SpringBoot集成RabbitMq
链接:一文搞定springBoot集成RabbitMq-CSDN博客
欲渡黄河冰塞川,将登太行雪满山~~
文章有不足之处欢迎各位纠正,希望能和大家一起砥砺前行!!!加油
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。