当前位置:   article > 正文

RabbitMQ工作模式(5) - 主题模式

RabbitMQ工作模式(5) - 主题模式

 概念

主题模式(Topic Exchange)是 RabbitMQ 中一种灵活且强大的消息传递模式,它允许生产者根据消息的特定属性将消息发送到一个交换机,并且消费者可以根据自己的需求来接收感兴趣的消息。主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配,支持通配符 *#,从而实现了更灵活的消息路由和分发。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个主题交换机,并指定一个特定的路由键。

  2. 交换机根据路由键路由消息: 主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配。路由键可以包含多个单词,以 . 分隔,例如 stock.usd.nyseweather.usa.ca.sunny 等。

  3. 消息发送到匹配的队列: 如果消息的路由键与绑定队列的路由键完全匹配,则将消息发送到对应的队列中。如果路由键中包含通配符 *#,则可以匹配多个单词或多个级别的单词,从而实现更灵活的匹配规则。

  4. 消费者接收消息: 消费者可以根据自己的需求来选择监听匹配的队列,从而接收感兴趣的消息。消费者可以使用通配符 * 匹配一个单词,或使用 # 匹配零个或多个单词。

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开

优点

  • 灵活性:生产者可以根据消息的特定属性来发送消息,消费者可以根据自己的需求来接收感兴趣的消息。
  • 精确匹配:支持精确的路由键匹配和模糊匹配,可以根据实际需求定义复杂的路由规则。
  • 扩展性:可以根据需要动态地添加和修改绑定规则,而不需要停止消息传递服务。

主题模式适用于需要根据消息的特定属性进行灵活路由和分发的场景,例如事件处理、消息过滤、数据分析等。

 Springboot集成

示例: 系统应用程序测试的时候,会有不同的BUG,测试人员会将不同的BUG按照规范打上标签(相当于routingKey),然后发送到mq中,然后通过主题模式分发;

标签内容:bug归属.模块.等级 例如: back.order.severity

分发规则如下:

第一个消费者是前端的开发人员:处理所有严重的前端BUG:front.#

第二个消费者是后端负责订单模块开发人员:处理所有的后端order模块BUG:back.order.*

另外还有多个消费者处理不同的BUG,这里只用两个做示例

1.创建队列和交换机并绑定

 在TopicConfig中配置

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Author: Haiven
  7. * @Time: 2024/4/22 11:55
  8. * @Description: TODO
  9. */
  10. @Configuration
  11. public class TopicConfig {
  12. /**
  13. * 主题模式交换机
  14. * @return exchange
  15. */
  16. @Bean(name = "topicExchange")
  17. public Exchange getTopicExchange(){
  18. return ExchangeBuilder
  19. .topicExchange("exchange_topic")
  20. .build();
  21. }
  22. /**
  23. * 主题队列 01
  24. * @return queue
  25. */
  26. @Bean(name = "topicQueue01")
  27. public Queue getTopicQueue01(){
  28. return QueueBuilder
  29. .durable("queue_topic_01")
  30. .build();
  31. }
  32. /**
  33. * 主题队列 02
  34. * @return queue
  35. */
  36. @Bean(name = "topicQueue02")
  37. public Queue getTopicQueue02(){
  38. return QueueBuilder
  39. .durable("queue_topic_02")
  40. .build();
  41. }
  42. /**
  43. * 绑定队列 01
  44. * @return binding
  45. */
  46. @Bean
  47. public Binding getTopicBinding01(){
  48. return BindingBuilder
  49. .bind(getTopicQueue01())
  50. .to(getTopicExchange())
  51. //路由键 队列1接收debug级别的消息
  52. .with("front.#")
  53. .noargs();
  54. }
  55. /**
  56. * 绑定队列 02
  57. * @return binding
  58. */
  59. @Bean
  60. public Binding getTopicBinding02(){
  61. return BindingBuilder
  62. .bind(getTopicQueue02())
  63. .to(getTopicExchange())
  64. // 路由键 队列2接收info级别的消息
  65. .with("back.order.*")
  66. .noargs();
  67. }
  68. }

 主题模式的交换机类型为TopicExchange

2.创建消费者

TopicConsumer

  1. package com.model.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @Author: Haiven
  6. * @Time: 2024/4/22 10:08
  7. * @Description: TODO
  8. */
  9. @Component
  10. public class TopicConsumer {
  11. @RabbitListener(queues = {"queue_topic_01"})
  12. public void topicConsumer01(String msg){
  13. System.out.println("消费者 -01- 接收消息:" + msg);
  14. }
  15. @RabbitListener(queues = {"queue_topic_02"})
  16. public void topicConsumer02(String msg){
  17. System.out.println("消费者 -02- 接收消息:" + msg);
  18. }
  19. }

3.创建生产者并发送消息

  1. package com.model.controller;
  2. import com.code.domain.Response;
  3. import com.model.service.RabbitService;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. /**
  10. * @Author: Haiven
  11. * @Time: 2024/4/19 9:46
  12. * @Description: TODO
  13. */
  14. @RestController
  15. @RequestMapping("/producer")
  16. public class ProducerController {
  17. @Resource
  18. private RabbitService rabbitService;
  19. @GetMapping("/simple")
  20. public Response<Void> simple(String msg){
  21. boolean res = rabbitService.simple(msg);
  22. return res ? Response.success() : Response.fail();
  23. }
  24. @GetMapping("/work")
  25. public Response<Void> work(String msg){
  26. boolean res = rabbitService.work(msg);
  27. return res ? Response.success() : Response.fail();
  28. }
  29. @GetMapping("/sub")
  30. public Response<Void> sub(String msg){
  31. boolean res = rabbitService.sub(msg);
  32. return res ? Response.success() : Response.fail();
  33. }
  34. @GetMapping("/routing")
  35. public Response<Void> routing(String msg, String type){
  36. boolean res = rabbitService.routing(msg, type);
  37. return res ? Response.success() : Response.fail();
  38. }
  39. @GetMapping("/topic")
  40. public Response<Void> topic(String msg, String type){
  41. boolean res = rabbitService.topic(msg, type);
  42. return res ? Response.success() : Response.fail();
  43. }
  44. }
  1. package com.model.service.impl;
  2. import com.model.service.RabbitService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.Resource;
  8. /**
  9. * @Author: Haiven
  10. * @Time: 2024/4/19 10:51
  11. * @Description: TODO
  12. */
  13. @Service
  14. @Slf4j
  15. public class RabbitServiceImpl implements RabbitService {
  16. @Resource
  17. private RabbitTemplate rabbitTemplate;
  18. @Value("${rabbitmq.simple.queue}")
  19. private String simpleQueue;
  20. @Value("${rabbitmq.work.queue}")
  21. private String workQueue;
  22. @Override
  23. public boolean simple(String msg) {
  24. try {
  25. rabbitTemplate.convertAndSend(simpleQueue, msg);
  26. return true;
  27. }catch (Exception e){
  28. e.printStackTrace();
  29. return false;
  30. }
  31. }
  32. @Override
  33. public boolean work(String msg) {
  34. try {
  35. rabbitTemplate.convertAndSend(workQueue, msg);
  36. return true;
  37. }catch (Exception e){
  38. e.printStackTrace();
  39. return false;
  40. }
  41. }
  42. @Override
  43. public boolean sub(String msg) {
  44. try {
  45. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  46. rabbitTemplate.convertAndSend("exchange_sub","", msg);
  47. return true;
  48. }catch (Exception e){
  49. e.printStackTrace();
  50. return false;
  51. }
  52. }
  53. @Override
  54. public boolean routing(String msg, String type) {
  55. System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");
  56. try {
  57. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  58. rabbitTemplate.convertAndSend("exchange_routing",type, msg);
  59. return true;
  60. }catch (Exception e){
  61. e.printStackTrace();
  62. return false;
  63. }
  64. }
  65. @Override
  66. public boolean topic(String msg, String type) {
  67. System.out.println("主题模式发送消息:msg="+msg+",type="+type+"");
  68. try {
  69. //主题模式会根据 type的通配符进行分发
  70. rabbitTemplate.convertAndSend("exchange_topic",type, msg);
  71. return true;
  72. }catch (Exception e){
  73. e.printStackTrace();
  74. return false;
  75. }
  76. }
  77. }

4.发送消息

接口调用发送消息, type字段为消息的级别

 后台接收

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/503829
推荐阅读
相关标签
  

闽ICP备14008679号