当前位置:   article > 正文

RabbitMq的五大消息模型及Java代码演示_rabbitmq五种消息模型

rabbitmq五种消息模型

RabbitMq一共有6种消息模型,分别为:

1.基本模型(Hello World)

2.消息分发(工作队列)模型(Work queues)

3.Fanout订阅模型-消息订阅模式(Publish/Subscribe)

4.Direct订阅模型-路由模式(Routing)

5.Topic订阅模型-匹配模式(Topic)

6.RPC模式(RPC)

而其中的RPC模式,它并不完全属于消息队列(MQ)的定义,而是使用MQ的某些功能来实现RPC的效果。

所以简单的介绍下以下五种消息模型:

一.基本模型(Hello World)

1.这是个非常简单的模型,即一个生产者发送消息到一个队列,一个消费者从该队列中接收并处理消息。

2.主要适用于RabbitMq的测试和日常我们学习的场景,在需要快速搭建轻量级消息队列系统的应用中,简单模式也是个很好的选择。

代码实现:

在RabbitMq的webUI界面去创建,为了方便我直接在RabbitMq管理页面创建一个名为basic1的队列(我们也可以在Java代码中去配置queue,交换机及其绑定,为了方便展示用了管理页面)

用postMan请求,生产者发送消息

  1. package com.org.product.controller;
  2. import com.org.product.config.MqProductCallBack;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.MessageDeliveryMode;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import javax.annotation.Resource;
  11. /**
  12. * Created with IntelliJ IDEA.
  13. * @Author: 你的名字
  14. */
  15. @RestController
  16. @RequestMapping("/product")
  17. @Slf4j
  18. public class PushMessageController {
  19. @Resource
  20. private RabbitTemplate rabbitTemplate;
  21. @Resource
  22. private MqProductCallBack mqProductCallBack;
  23. @GetMapping("/basic")
  24. public String basic(){
  25. log.info("-------------消息推送开始--------------");
  26. //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
  27. CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
  28. //消息确认与返回
  29. rabbitTemplate.setConfirmCallback(mqProductCallBack);
  30. rabbitTemplate.setReturnsCallback(mqProductCallBack);
  31. /**
  32. *消息发送
  33. *在简单模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
  34. *也就是routingKey写队列名称
  35. */
  36. rabbitTemplate.convertAndSend("basic1","小飞棍来喽!",
  37. //Lambda表达式,实现MessagePostProcessor接口
  38. message -> {
  39. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  40. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  41. //返回修改后的消息
  42. return message;
  43. }, correlationData);
  44. log.info("--------------消息推送结束------------------");
  45. return "消息发送成功了!!!";
  46. }
  47. }

控制台

消费者监听 

  1. package com.org.consumer.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * Created with IntelliJ IDEA.
  6. *
  7. * @Author: 你的名字
  8. * @Description:监听消息
  9. */
  10. @Component
  11. public class MessageListener {
  12. @RabbitListener(queues ="basic1")
  13. public void handleMessage(String message) {
  14. //处理接收到的消息
  15. System.out.println("接收到的消息是:" + message);
  16. }
  17. //手动ack
  18. // @RabbitListener(queues = "queue2")
  19. // public void handleMessage1(String message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long tag) throws IOException {
  20. // System.out.println("接收到的消息是:" + message);
  21. // try {
  22. // Thread.sleep(2000);
  23. // //发生异常
  24. // int i = 1/0;
  25. // //手动ack
  26. // channel.basicAck(tag,false);
  27. // } catch (Exception e) {
  28. // //手动ack,让消息重回队列,参数三表示是否重回队列
  29. // channel.basicNack(tag,false,false);
  30. //
  31. // }
  32. //
  33. // }
  34. }

控制台

二 .消息分发(工作队列)模型(Work queues)

1.工作队列将一些耗时的任务封装为消息,并将其发送到一个中心队列,多个消费者同时从队列中获取任务,每个任务只会被一个消费者获取并处理。

2.工作队列模型主要适用于长时间运行的任务场景,例如订单系统,当单个订单的处理时间较长(比如15秒),我们可以将多个订单同时放入消息队列中,让多个消费者同时处理这些订单,从而实现并行处理而不是串行处理;还可以实现需要异步处理任务的场景,如更新数据库等。

代码实现:

和上边一样我们先创建名为work1的队列

生产者代码

  1. @GetMapping("/work")
  2. public String work(){
  3. log.info("-------------消息推送开始--------------");
  4. //发送10条消息测试
  5. for (int i = 1; i <= 10; i++) {
  6. //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
  7. CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
  8. //消息确认与返回
  9. rabbitTemplate.setConfirmCallback(mqProductCallBack);
  10. rabbitTemplate.setReturnsCallback(mqProductCallBack);
  11. /**
  12. *消息发送
  13. *在工作队列模式下,由于使用默认的交换机,则要保持路由名称和队列名称一致,才能把消息发送到队列中去;
  14. *也就是routingKey写队列名称
  15. */
  16. rabbitTemplate.convertAndSend("work1","小飞棍来喽"+i+"!",
  17. //Lambda表达式,实现MessagePostProcessor接口
  18. message -> {
  19. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  20. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  21. //返回修改后的消息
  22. return message;
  23. }, correlationData);
  24. }
  25. log.info("--------------消息推送结束------------------");
  26. return "消息发送成功了!!!";
  27. }

控制台

 消费者代码

  1. @RabbitListener(queues = "work1")
  2. public void workQueue1(String message){
  3. System.out.println("消费者--1--接收到的消息是:" + message);
  4. }
  5. @RabbitListener(queues = "work1")
  6. public void workQueue2(String message){
  7. System.out.println("消费者--2--接收到的消息是:" + message);
  8. }

控制台

三. Fanout订阅模型-消息订阅模式(Publish/Subscribe)

1.Fanout订阅模型和工作队列模式比较,前者用到了fanout交换机,并广播到多个队列,消息生产者将消息发送到交换机(Exchange),而交换机以广播的形式将消息发送给所有与该交换机绑定的队列,从而实现了消息的广播。

2.Fanout交换机:主要用来将消息广播给绑定该交换机的队列,不关心消息的路由键是什么,也就是无论消息路由键是什么,Fanout交换机都将消息发送给绑定的队列,与Direct交换机相比,不需要设置路由键就可以将消息广播到所有队列,与Topic交换机相比,它不支持通配符匹配路由键。

3.主要适用于将消息广播给多个消费者,例如:日志记录,实时消息推送等

代码实现:

(1)首先我们在web界面创建名为fanout_queue1,fanout_queue2队列。

(2)在创建Fanout类型交换机,名为fanout_exchange。

(3)绑定队列fanout_queue1,fanout_queue2

路由键Routing key 设不设置都可以,我这边就不设置了

生产者代码

  1. @GetMapping("/fanout")
  2. public String fanout() {
  3. for (int i = 1; i <= 4; i++) {
  4. //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
  5. CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
  6. //消息确认与返回
  7. rabbitTemplate.setConfirmCallback(mqProductCallBack);
  8. rabbitTemplate.setReturnsCallback(mqProductCallBack);
  9. /**
  10. *消息发送
  11. *在fanout订阅模式我们可以随意定义路由
  12. *也就是routingKey写队列名称
  13. */
  14. rabbitTemplate.convertAndSend("fanout_exchange", "666", "小飞棍来喽" + i + "!",
  15. //Lambda表达式,实现MessagePostProcessor接口
  16. message -> {
  17. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  18. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  19. //返回修改后的消息
  20. return message;
  21. }, correlationData);
  22. }
  23. return "消息发送成功了!!!";
  24. }

控制台:4条消息发送成功

消费者代码:定义两个分别监听两个队列的消费者

  1. //fanout订阅模式
  2. @RabbitListener(queues = "fanout_queue1")
  3. public void fanout1(String message){
  4. System.out.println("fanout_queue1--的消息是:" + message);
  5. }
  6. @RabbitListener(queues = "fanout_queue2")
  7. public void fonout2(String message){
  8. System.out.println("fanout_queue2--的消息是:" + message);
  9. }

 控制台:4条消息,分别广播到两个队列

.Direct订阅模型-路由模式(Routing)

 1.Direct订阅模式用到了Direct交换机,不同于Fanout订阅模式,它必须指定路由键,允许消费者只接收与特定路由键匹配的消息,从而实现了有选择性接收,它的匹配方式是精确匹配,即路由键和绑定键必须完全相同,不同则无法将消息正确路由到对应的队列,消息可能会被丢弃或者根据RabbitMq配置进行处理(如返回给生产者或者进入死信队列)。

2.Direct订阅模式适用于日志处理,订单系统等,比如我们根据订单号将订单消息路由到特定的处理队列中。

代码实现:

(1)我们在RabbitMq的web管理页面创建Direct交换机,名为direct_exchange。

(2)然后创建两个队列,分列为direct_queue1,direct_queue2。(你也可以用之前创建过的)

(3)交换机direct_exchange和direct_queue1的绑定路由键为direct_key1。

         交换机direct_exchange与direct_queue2绑定的路由键为direct_key2。

生产者代码:将 "小飞棍来喽A、B、C" 发送到direct_queue2队列,将 "小飞棍来喽A" 发送到direct_queue1队列

  1. @GetMapping("/direct")
  2. public String direct() {
  3. //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
  4. CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
  5. //消息确认与返回
  6. rabbitTemplate.setConfirmCallback(mqProductCallBack);
  7. rabbitTemplate.setReturnsCallback(mqProductCallBack);
  8. /**
  9. *消息发送
  10. *在direct订阅模式我们要精准路由
  11. */
  12. rabbitTemplate.convertAndSend("direct_exchange", "direct_key2", "小飞棍来喽A", message -> {
  13. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  14. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  15. //返回修改后的消息
  16. return message;},correlationData);
  17. rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽B",message -> {
  18. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  19. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  20. //返回修改后的消息
  21. return message;},correlationData);
  22. rabbitTemplate.convertAndSend("direct_exchange","direct_key2","小飞棍来喽C",message -> {
  23. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  24. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  25. //返回修改后的消息
  26. return message;},correlationData);
  27. rabbitTemplate.convertAndSend("direct_exchange", "direct_key1", "小飞棍来喽A",message -> {
  28. //获取消息的属性,设置传输模式DeliveryMode为持久化,会写入磁盘
  29. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  30. //返回修改后的消息
  31. return message;},correlationData);
  32. return "消息发送成功了!!!";
  33. }

控制台:4条消息发送成功

消费者代码:

  1. //direct订阅模式
  2. @RabbitListener(queues = "direct_queue1")
  3. public void direct1(String message){
  4. System.out.println("direct_queue1--的消息是:" + message);
  5. }
  6. @RabbitListener(queues = "direct_queue2")
  7. public void direct2(String message){
  8. System.out.println("direct_queue2--的消息是:" + message);
  9. }

 控制台:消费者把对应的队列消息消费下来了

五.5.Topic订阅模型-匹配模式(Topic)

 1.Topic订阅模型相比Direct订阅模型,分组更细致,也更灵活,他允许在路由键上使用通配符,并根据发送消息时携带的路由键进行模糊匹配。需要注意的是用通配符匹配,单词之间要用(.)分隔。

2.通配符:
*(星号)匹配一个单词:例如,我们代码中配置的路由键是superman.A或superman.B或superman.C那么我们绑定的键(Binding Key)为superman.*的队列将会匹配到这个路由键,但是如果是superman.A.D,就不会匹配到

#(井号)匹配零个或者多个单词:例如,路由键是superman.A.D,那么绑定的superman.#的队列会匹配到这个路由键,当然如果是superman.A.D.E等都会匹配到(注意,不建议用#.#它会导致接收到大量的消息)

3.Topic订阅模型适用于根据消息内容或者类别来进行灵活分类。例如分布式任务调度,新闻订阅等。

代码实现:

先在RabbitMq管理页面新建topic_exchange交换机

然后建立两个队列topic_queue1,topic_queue2

最后topic_exchange交换机绑定topic_queue1的路由键为topic.*;绑定topic_queue2的路由键为topic.#

生产者代码

  1. @GetMapping("/topic")
  2. public String topic() {
  3. //创建CorrelationData对象,包含唯一id,id的作用是在回调函数中识别消息,也就是根据id跟踪这条消息
  4. CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
  5. //消息确认与返回
  6. rabbitTemplate.setConfirmCallback(mqProductCallBack);
  7. rabbitTemplate.setReturnsCallback(mqProductCallBack);
  8. /**
  9. * 发送给topic_queue1
  10. * 因为topic_queue1我们绑定的路由键是topic.*,所以我们routingKey可以写(topic.任何一个单词)
  11. */
  12. rabbitTemplate.convertAndSend("topic_exchange", "topic.A", "小飞棍来喽A!", message -> {
  13. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  14. return message;
  15. }, correlationData);
  16. /**
  17. * 发送给topic_queue2
  18. * 因为topic_queue2我们绑定的路由键是topic.#,所以我们routingKey可以写(topic.一个或者多个单词)
  19. */
  20. rabbitTemplate.convertAndSend("topic_exchange", "topic.B.B.B", "小飞棍来喽B", message -> {
  21. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  22. return message;
  23. }, correlationData);
  24. return "消息发送成功了!!!";
  25. }

消费者代码

  1. //topic订阅模式
  2. @RabbitListener(queues = "topic_queue1")
  3. public void topic1(String message){
  4. System.out.println("topic_queue1--的消息是:" + message);
  5. }
  6. @RabbitListener(queues = "topic_queue2")
  7. public void topic2(String message){
  8. System.out.println("topic_queue2--的消息是:" + message);
  9. }

消费者控制台

由上边的消费结果可以看出,topic_queue1接收到一条消息,topic_queue2接收到两条消息,

是因为topic.*的路由匹配topic.A,而匹配不到topic.B.B.B,

而topic.#的路由匹配到了topic.A以及topic.B.B.B

以上五种模型涵盖了RabbitMQ的主要消息传递方式,从简单的点到点通信到复杂的发布/订阅模式,可以满足不同应用场景的需求。同时,RabbitMQ还提供了消息确认机制、消息持久化、消息优先级和消息过期时间等特性,以保证消息的可靠性、持久性和灵活性,我们可以根据我们业务的需求场景来决定使用哪一种消息模式。

上边的SpringBoot代码工程在上一篇SpringBoot集成RabbitMq

链接:一文搞定springBoot集成RabbitMq-CSDN博客

欲渡黄河冰塞川,将登太行雪满山~~

文章有不足之处欢迎各位纠正,希望能和大家一起砥砺前行!!!加油

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号