当前位置:   article > 正文

RabbitMQ工作模式(4) - 路由模式

RabbitMQ工作模式(4) - 路由模式

概念

路由模式(Routing)是 RabbitMQ 中的一种消息传递模式,也称为直连模式。它允许生产者将消息发送到一个交换机,并指定一个或多个路由键(routing key),交换机根据路由键将消息路由到与之匹配的队列中。这样消费者只需关注感兴趣的消息,而不需要接收所有的消息。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个交换机,并指定一个或多个路由键。

  2. 交换机根据路由键路由消息: 交换机根据消息的路由键将消息发送到与之匹配的队列中。匹配规则可以由交换机的类型和绑定规则决定。

  3. 消费者监听队列: 消费者可以选择监听特定的队列,或者多个队列,以接收他们感兴趣的消息。

  4. 消息处理: 消费者从队列中接收消息,并进行相应的处理。只有与队列绑定的交换机根据消息的路由键将消息发送到该队列中。

特点

  • 灵活路由:生产者可以根据需要指定不同的路由键来发送消息,交换机根据路由键将消息路由到不同的队列。
  • 定向传递:消息只会被发送到与之匹配的队列中,消费者只需关注他们感兴趣的消息,而不需要接收所有的消息。
  • 路由规则:可以根据实际需求定义不同的路由规则,例如根据消息的类型、内容、优先级等进行路由。

路由模式适用于需要根据不同的消息属性将消息路由到不同队列的场景,例如消息分类、事件处理、分布式系统等。

Springboot集成

1.创建队列和交换机并绑定

在RoutingConfig文件中配置

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Author: Haiven
  7. * @Time: 2024/4/22 10:09
  8. * @Description: TODO
  9. */
  10. @Configuration
  11. public class RoutingConfig {
  12. /**
  13. * 路由模式交换机
  14. * @return exchange
  15. */
  16. @Bean(name = "routingExchange")
  17. public Exchange getRoutingExchange(){
  18. return ExchangeBuilder
  19. .directExchange("exchange_routing")
  20. .build();
  21. }
  22. /**
  23. * 路由队列 01
  24. * @return queue
  25. */
  26. @Bean(name = "routingQueue01")
  27. public Queue getRoutingQueue01(){
  28. return QueueBuilder
  29. .durable("queue_routing_01")
  30. .build();
  31. }
  32. /**
  33. * 路由队列 02
  34. * @return queue
  35. */
  36. @Bean(name = "routingQueue02")
  37. public Queue getRoutingQueue02(){
  38. return QueueBuilder
  39. .durable("queue_routing_02")
  40. .build();
  41. }
  42. /**
  43. * 绑定队列 01
  44. * @return binding
  45. */
  46. @Bean
  47. public Binding getRoutingBinding01(){
  48. return BindingBuilder
  49. .bind(getRoutingQueue01())
  50. .to(getRoutingExchange())
  51. //路由键 队列1接收debug级别的消息
  52. .with("debug")
  53. .noargs();
  54. }
  55. /**
  56. * 绑定队列 02
  57. * @return binding
  58. */
  59. @Bean
  60. public Binding getRoutingBinding02(){
  61. return BindingBuilder
  62. .bind(getRoutingQueue02())
  63. .to(getRoutingExchange())
  64. // 路由键 队列2接收info级别的消息
  65. .with("info")
  66. .noargs();
  67. }
  68. /**
  69. * 绑定队列 01
  70. * @return binding
  71. */
  72. @Bean
  73. public Binding getRoutingBinding03(){
  74. return BindingBuilder
  75. .bind(getRoutingQueue01())
  76. .to(getRoutingExchange())
  77. //路由键 队列1接收debug级别的消息
  78. .with("err")
  79. .noargs();
  80. }
  81. }

 !!!这里创建的交换机类型为:DirectExchange,如果交换机内容错误,会导致消息错误的分发

  1. Direct Exchange(直连交换机): 直连交换机将消息的路由键与绑定队列的路由键进行精确匹配,只有当消息的路由键与绑定队列的路由键完全相同时,才会将消息路由到对应的队列。

  2. Fanout Exchange(扇出交换机): 扇出交换机将消息广播到所有与之绑定的队列,无视消息的路由键。这种模式适用于需要将消息广播给多个消费者的场景。

  3. Topic Exchange(主题交换机): 主题交换机根据消息的路由键与绑定队列的路由键进行模糊匹配,支持通配符 *#* 表示匹配一个单词,# 表示匹配零个或多个单词。这种模式适用于需要根据消息的特定属性进行路由的场景。

  4. Headers Exchange(头交换机): 头交换机根据消息的头部属性进行匹配,而不是路由键。在绑定队列时,可以指定匹配的头部属性和值,只有当消息的头部属性和值与绑定规则完全匹配时,才会将消息发送到对应的队列。

  5. Default Exchange(默认交换机): 默认交换机是 RabbitMQ 的默认交换机,它将消息的路由键与队列的名称进行匹配,如果消息的路由键与队列的名称完全匹配,则将消息路由到该队列中。默认交换机通常以空字符串表示,不需要显示声明,可以直接使用。

2.创建消费者

RoutingConsumer

  1. package com.model.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @Author: Haiven
  6. * @Time: 2024/4/22 10:08
  7. * @Description: TODO
  8. */
  9. @Component
  10. public class RoutingConsumer {
  11. @RabbitListener(queues = {"queue_routing_01"})
  12. public void routingConsumer01(String msg){
  13. System.out.println("消费者 -01- 接收消息:" + msg);
  14. }
  15. @RabbitListener(queues = {"queue_routing_02"})
  16. public void routingConsumer02(String msg){
  17. System.out.println("消费者 -02- 接收消息:" + msg);
  18. }
  19. }

3.创建生产者并发送消息

  1. package com.model.controller;
  2. import com.code.domain.Response;
  3. import com.model.service.RabbitService;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. /**
  10. * @Author: Haiven
  11. * @Time: 2024/4/19 9:46
  12. * @Description: TODO
  13. */
  14. @RestController
  15. @RequestMapping("/producer")
  16. public class ProducerController {
  17. @Resource
  18. private RabbitService rabbitService;
  19. @GetMapping("/simple")
  20. public Response<Void> simple(String msg){
  21. boolean res = rabbitService.simple(msg);
  22. return res ? Response.success() : Response.fail();
  23. }
  24. @GetMapping("/work")
  25. public Response<Void> work(String msg){
  26. boolean res = rabbitService.work(msg);
  27. return res ? Response.success() : Response.fail();
  28. }
  29. @GetMapping("/sub")
  30. public Response<Void> sub(String msg){
  31. boolean res = rabbitService.sub(msg);
  32. return res ? Response.success() : Response.fail();
  33. }
  34. @GetMapping("/routing")
  35. public Response<Void> routing(String msg, String type){
  36. boolean res = rabbitService.routing(msg, type);
  37. return res ? Response.success() : Response.fail();
  38. }
  39. }
  1. package com.model.service.impl;
  2. import com.model.service.RabbitService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.Resource;
  8. /**
  9. * @Author: Haiven
  10. * @Time: 2024/4/19 10:51
  11. * @Description: TODO
  12. */
  13. @Service
  14. @Slf4j
  15. public class RabbitServiceImpl implements RabbitService {
  16. @Resource
  17. private RabbitTemplate rabbitTemplate;
  18. @Value("${rabbitmq.simple.queue}")
  19. private String simpleQueue;
  20. @Value("${rabbitmq.work.queue}")
  21. private String workQueue;
  22. @Override
  23. public boolean simple(String msg) {
  24. try {
  25. rabbitTemplate.convertAndSend(simpleQueue, msg);
  26. return true;
  27. }catch (Exception e){
  28. e.printStackTrace();
  29. return false;
  30. }
  31. }
  32. @Override
  33. public boolean work(String msg) {
  34. try {
  35. rabbitTemplate.convertAndSend(workQueue, msg);
  36. return true;
  37. }catch (Exception e){
  38. e.printStackTrace();
  39. return false;
  40. }
  41. }
  42. @Override
  43. public boolean sub(String msg) {
  44. try {
  45. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  46. rabbitTemplate.convertAndSend("exchange_sub","", msg);
  47. return true;
  48. }catch (Exception e){
  49. e.printStackTrace();
  50. return false;
  51. }
  52. }
  53. @Override
  54. public boolean routing(String msg, String type) {
  55. System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");
  56. try {
  57. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  58. rabbitTemplate.convertAndSend("exchange_routing",type, msg);
  59. return true;
  60. }catch (Exception e){
  61. e.printStackTrace();
  62. return false;
  63. }
  64. }
  65. }

4.发送消息

接口调用发送消息, type字段为消息的级别

 后台接收

 debug级别消息只被消费者1消费

 info级别的消息只被消费者2消费

5.额外说明

上述消费者1只消费了debug级别,如果还有err级别的消息,只需在将队列1绑定err级别的消息

  1. /**
  2. * 绑定队列 01
  3. * @return binding
  4. */
  5. @Bean
  6. public Binding getRoutingBinding03(){
  7. return BindingBuilder
  8. .bind(getRoutingQueue01())
  9. .to(getRoutingExchange())
  10. //路由键 队列1接收debug级别的消息
  11. .with("err")
  12. .noargs();
  13. }

发送消息并测试

 如果某种消息级别(warn)没有被绑定,这该级别的消息会被丢弃

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/507353
推荐阅读
相关标签
  

闽ICP备14008679号