当前位置:   article > 正文

RabbitMQ工作模式(3) - 订阅模式_rabbitmq 订阅模式

rabbitmq 订阅模式

 概念

发布/订阅模式(Publish/Subscribe)是 RabbitMQ 中常见的一种消息传递模式,用于将消息广播给多个消费者。在这种模式中,消息发送者(发布者)将消息发送到一个交换机(exchange),交换机将消息广播到所有与之绑定的队列,然后消费者(订阅者)可以从这些队列中接收消息。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个交换机,而不是直接发送到队列。

  2. 交换机将消息广播: 交换机接收到消息后,根据预定义的规则将消息广播到所有与之绑定的队列。这个过程称为消息路由。

  3. 多个消费者监听队列: 多个消费者可以分别监听不同的队列,或者监听同一个队列。

  4. 消息处理: 每个消费者接收到广播的消息后,进行相应的处理。每个消息只会被消费一次,但是可以被多个消费者同时处理。

 

特点

  • 消息广播:消息被广播到所有与交换机绑定的队列,而不是直接发送到特定的队列。
  • 解耦合:发布者和订阅者之间通过交换机进行解耦,发布者无需知道消息将被传递到哪些队列。
  • 多播:支持多个消费者同时处理同一条消息,以实现消息的多播效果。
  • 灵活性:可以根据需要使用不同类型的交换机和绑定规则,以满足不同的消息传递需求。

发布/订阅模式适用于需要将消息广播给多个消费者的场景,例如实时通知、日志记录、事件处理等。

 Springboot集成

这里为了方便和速度就不配置yml文件中,直接编辑,这里配置两个队列

交换机名称: exchange_sub

队列一名称: queue_sub_01

队列一名称: queue_sub_02

1.创建队列和交换机并绑定

在SubConfig文件中配置

这里方便区分,新建了文件SubConfig,每个工作模式创建队列和交换机的过程区分开,全都配置到RabbitmqConfig文件中也是可以的,同时也可以通过RabbitAdmin进行绑定(另一种方式)。

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Author: Haiven
  7. * @Time: 2024/4/19 16:29
  8. * @Description: TODO
  9. */
  10. @Configuration
  11. public class SubConfig {
  12. /**
  13. * 发布/订阅模式的交换机
  14. * @return exchange
  15. */
  16. @Bean(name = "subExchange")
  17. public Exchange getSubExchange(){
  18. return ExchangeBuilder
  19. .fanoutExchange("exchange_sub")
  20. .durable(true)
  21. .build();
  22. }
  23. /**
  24. * 发布/订阅模式的队列 1
  25. * @return 队列 1
  26. */
  27. @Bean(name = "subQueue01")
  28. public Queue getSubQueue01(){
  29. return QueueBuilder
  30. .durable("queue_sub_01")
  31. .build();
  32. }
  33. /**
  34. * 发布/订阅模式的队列 2
  35. * @return 队列 2
  36. */
  37. @Bean(name = "subQueue02")
  38. public Queue getSubQueue02(){
  39. return QueueBuilder
  40. .durable("queue_sub_02")
  41. .build();
  42. }
  43. /**
  44. * 绑定队列01
  45. * @return binding
  46. */
  47. @Bean
  48. public Binding getSubBinding01(){
  49. return BindingBuilder
  50. .bind(getSubQueue01())
  51. .to(getSubExchange())
  52. // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
  53. .with("")
  54. .noargs();
  55. }
  56. /**
  57. * 绑定队列02
  58. * @return binding
  59. */
  60. @Bean
  61. public Binding getSubBinding02(){
  62. return BindingBuilder
  63. .bind(getSubQueue02())
  64. .to(getSubExchange())
  65. // 通配符模式 要匹配的路由键 此处为发布/订阅模式 填""就可以
  66. .with("")
  67. .noargs();
  68. }
  69. }

 2.创建消费者

SubConsumer

  1. package com.model.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @Author: Haiven
  6. * @Time: 2024/4/19 16:44
  7. * @Description: TODO
  8. */
  9. @Component
  10. public class SubConsumer {
  11. @RabbitListener(queues = {"queue_sub_01"})
  12. public void subConsumer01(String msg){
  13. System.out.println("消费者 -01- 接收消息:" + msg);
  14. }
  15. @RabbitListener(queues = {"queue_sub_02"})
  16. public void subConsumer02(String msg){
  17. System.out.println("消费者 -02- 接收消息:" + msg);
  18. }
  19. }

3.创建生产者并发送消息

  1. package com.model.controller;
  2. import com.code.domain.Response;
  3. import com.model.service.RabbitService;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. /**
  10. * @Author: Haiven
  11. * @Time: 2024/4/19 9:46
  12. * @Description: TODO
  13. */
  14. @RestController
  15. @RequestMapping("/producer")
  16. public class ProducerController {
  17. @Resource
  18. private RabbitService rabbitService;
  19. @GetMapping("/simple")
  20. public Response<Void> simple(String msg){
  21. boolean res = rabbitService.simple(msg);
  22. return res ? Response.success() : Response.fail();
  23. }
  24. @GetMapping("/work")
  25. public Response<Void> work(String msg){
  26. boolean res = rabbitService.work(msg);
  27. return res ? Response.success() : Response.fail();
  28. }
  29. @GetMapping("/sub")
  30. public Response<Void> sub(String msg){
  31. boolean res = rabbitService.sub(msg);
  32. return res ? Response.success() : Response.fail();
  33. }
  34. }
  1. package com.model.service.impl;
  2. import com.model.service.RabbitService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.Resource;
  8. /**
  9. * @Author: Haiven
  10. * @Time: 2024/4/19 10:51
  11. * @Description: TODO
  12. */
  13. @Service
  14. @Slf4j
  15. public class RabbitServiceImpl implements RabbitService {
  16. @Resource
  17. private RabbitTemplate rabbitTemplate;
  18. @Value("${rabbitmq.simple.queue}")
  19. private String simpleQueue;
  20. @Value("${rabbitmq.work.queue}")
  21. private String workQueue;
  22. @Override
  23. public boolean simple(String msg) {
  24. try {
  25. rabbitTemplate.convertAndSend(simpleQueue, msg);
  26. return true;
  27. }catch (Exception e){
  28. e.printStackTrace();
  29. return false;
  30. }
  31. }
  32. @Override
  33. public boolean work(String msg) {
  34. try {
  35. rabbitTemplate.convertAndSend(workQueue, msg);
  36. return true;
  37. }catch (Exception e){
  38. e.printStackTrace();
  39. return false;
  40. }
  41. }
  42. @Override
  43. public boolean sub(String msg) {
  44. try {
  45. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  46. rabbitTemplate.convertAndSend("exchange_sub","", msg);
  47. return true;
  48. }catch (Exception e){
  49. e.printStackTrace();
  50. return false;
  51. }
  52. }
  53. }

4.发送消息

 发送成功

 可以发现,发布/订阅模式下,推送到交换机的消息,会被所有绑定了交换机的队列接收

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/843977
推荐阅读
相关标签
  

闽ICP备14008679号