当前位置:   article > 正文

RabbitMQ交换器类型(Direct、Topic、Fanout、Headers)与消息的发送/接收模式

RabbitMQ交换器类型(Direct、Topic、Fanout、Headers)与消息的发送/接收模式

1、RabbitMQ 交换器类型

RabbitMQ 常用的交换器类型有 Direct、Topic、Fanout、Headers 这四种。AMQP 协议里还提到另外两种类型:System 和自定义,这里不予描述。对于这四种类型下面一一阐述。

1.1 Direct 类型

Direct 类型的交换器由路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。

Direct Exchange 是 RabbitMQ 默认的交换器模式,也是最简单的模式。它根据 RoutingKey 完全匹配去寻找队列。

1.2 Topic 类型

上面讲到 Direct 类型的交换器由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。Topic 类型的交换器在匹配规则上进行了扩展,它与 Direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

(1)RoutingKey 为一个点号 “.” 分隔的字符串(被点号 “.” 分隔开的每一段独立的字符串称为一个单词),如:com.rabbitmq.client、java.util.concurrent、com.hidden.client;

(2)BindingKey 和 RoutingKey 一样也是点号 “.” 分隔的字符串;

(3)BindingKey 中可以存在两种特殊字符串星号 “*” 和井号 “#”,用于做模糊匹配,其中星号 “*” 用于匹配一个单词,井号 “#”用于匹配多个规则单词(0个或者多个单词);

1.3 Fanout 类型

消息广播的模式,即将消息广播到所有绑定到它的队列中,而不考虑 RoutingKey 的值(不管路键或是路由模式)。如果设置了 RoutingKey ,则 RoutingKey 依然被忽略。

1.4 Headers 类型

Headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。Headers 类型的交换器性能会很差,而且不实用,基本上不会看到它的存在。

 

2、RabbitMQ 消息的发送/接收模式

下面将通过示例来讲解 RabbitMQ 的发送/接收模式。首先需要创建两个 SpringBoot 项目并整合 RabbitMQ 客户端。

(1)创建第一个 SpringBoot 项目( rabbitmq-provider 消息推送项目),项目结构如下图:

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务:

  1. spring:
  2. # 项目名称
  3. application:
  4. name: rabbitmq-provider
  5. # RabbitMQ服务配置
  6. rabbitmq:
  7. host: 127.0.0.1
  8. port: 5672
  9. username: guest
  10. password: guest

(2)创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目),项目结构如下图:

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务:

  1. spring:
  2. # 项目名称
  3. application:
  4. name: rabbitmq-consumer
  5. # RabbitMQ服务配置
  6. rabbitmq:
  7. host: 127.0.0.1
  8. port: 5672
  9. username: guest
  10. password: guest

2.1 实现发送和接收队列

使用 Direct 模式,实现消息的发送和接收队列。

(1)配置队列

在 rabbitmq-provider(消息推送项目)中,配置队列名称,并将队列交由 IoC 管理,代码如下:

  1. package com.pjb.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * RabbitMQ配置类
  7. * @author pan_junbiao
  8. **/
  9. @Configuration
  10. public class DirectRabbitMqConfig
  11. {
  12. public static final String DIRECT_QUEUE_NAME = "direct_queue_name"; //队列名称
  13. public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; //交换器名称
  14. public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键
  15. /**
  16. * 队列
  17. */
  18. @Bean
  19. public Queue directQueue()
  20. {
  21. /**
  22. * 创建队列,参数说明:
  23. * String name:队列名称。
  24. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  25. * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
  26. * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
  27. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  28. * 当没有生产者或者消费者使用此队列,该队列会自动删除。
  29. * Map<String, Object> arguments:设置队列的其他一些参数。
  30. */
  31. return new Queue(DIRECT_QUEUE_NAME, true, false, false, null);
  32. }
  33. /**
  34. * Direct交换器
  35. */
  36. @Bean
  37. public DirectExchange directExchange()
  38. {
  39. /**
  40. * 创建交换器,参数说明:
  41. * String name:交换器名称
  42. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  43. * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  44. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  45. */
  46. return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
  47. }
  48. /**
  49. * 绑定
  50. */
  51. @Bean
  52. Binding bindingDirect(DirectExchange directExchange,Queue directQueue)
  53. {
  54. //将队列和交换机绑定, 并设置用于匹配键:routingKey
  55. return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
  56. }
  57. }

(2)创建发送者

在 rabbitmq-provider(消息推送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息,代码如下:

  1. package com.pjb;
  2. import com.pjb.config.DirectRabbitMqConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. /**
  10. * RabbitMQ测试类
  11. * @author pan_junbiao
  12. **/
  13. @SpringBootTest
  14. public class DirectRabbitMqTest
  15. {
  16. @Autowired
  17. RabbitTemplate rabbitTemplate;
  18. @Test
  19. public void sendDirectMessage()
  20. {
  21. //创建用户信息
  22. Map<String, Object> userMap = new HashMap<>();
  23. userMap.put("userId", "1");
  24. userMap.put("userName", "pan_junbiao的博客");
  25. userMap.put("blogUrl", "https://blog.csdn.net/pan_junbiao");
  26. userMap.put("userRemark", "您好,欢迎访问 pan_junbiao的博客");
  27. /**
  28. * 发送消息,参数说明:
  29. * String exchange:交换器名称。
  30. * String routingKey:路由键。
  31. * Object object:发送内容。
  32. */
  33. rabbitTemplate.convertAndSend(DirectRabbitMqConfig.DIRECT_EXCHANGE_NAME, DirectRabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
  34. System.out.println("消息发送成功!");
  35. }
  36. }

(3)创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建创建接收者,注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * 接收者
  8. * @author pan_junbiao
  9. **/
  10. @Component
  11. @RabbitListener(queues="direct_queue_name")
  12. public class DirectReceiver
  13. {
  14. @RabbitHandler
  15. public void process(Map message)
  16. {
  17. System.out.println("接收者收到消息:");
  18. System.out.println("用户编号:" + message.get("userId"));
  19. System.out.println("用户名称:" + message.get("userName"));
  20. System.out.println("博客地址:" + message.get("blogUrl"));
  21. System.out.println("博客信息:" + message.get("userRemark"));
  22. }
  23. }

运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。

执行结果:

 

2.2 实现用接收器接收多个主题

Topic 模式是 RabbitMQ 中最灵活的一种模式,可以根据 RoutingKey 自由地绑定不同的队列。

(1)配置 Topic 模式

在 rabbitmq-provider(消息推送项目)中,配置处理消息的队列,代码如下:

  1. package com.pjb.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * RabbitMQ配置类
  10. * @author pan_junbiao
  11. **/
  12. @Configuration
  13. public class TopicRabbitMqConfig
  14. {
  15. public static final String TOPIC_QUEUE_NAME_A = "topic_queue_name_a"; //队列名称A
  16. public static final String TOPIC_QUEUE_NAME_B = "topic_queue_name_b"; //队列名称B
  17. public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name"; //交换器名称
  18. /**
  19. * 队列A
  20. */
  21. @Bean
  22. public Queue topicQueueA()
  23. {
  24. /**
  25. * 创建队列,参数说明:
  26. * String name:队列名称。
  27. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  28. * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
  29. * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
  30. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  31. * 当没有生产者或者消费者使用此队列,该队列会自动删除。
  32. * Map<String, Object> arguments:设置队列的其他一些参数。
  33. */
  34. return new Queue(TOPIC_QUEUE_NAME_A, true);
  35. }
  36. /**
  37. * 队列B
  38. */
  39. @Bean
  40. public Queue topicQueueB()
  41. {
  42. /**
  43. * 创建队列,参数说明:
  44. * String name:队列名称。
  45. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  46. * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
  47. * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
  48. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  49. * 当没有生产者或者消费者使用此队列,该队列会自动删除。
  50. * Map<String, Object> arguments:设置队列的其他一些参数。
  51. */
  52. return new Queue(TOPIC_QUEUE_NAME_B, true);
  53. }
  54. /**
  55. * Topic交换器
  56. */
  57. @Bean
  58. TopicExchange exchange()
  59. {
  60. /**
  61. * 创建交换器,参数说明:
  62. * String name:交换器名称
  63. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  64. * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  65. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  66. */
  67. return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
  68. }
  69. /**
  70. * 绑定1
  71. */
  72. @Bean
  73. Binding bindingExchangeMessage1(Queue topicQueueA, TopicExchange exchange)
  74. {
  75. //将队列和交换机绑定, 并设置用于匹配键:routingKey
  76. return BindingBuilder.bind(topicQueueA).to(exchange).with("topic.routingKey.a");
  77. }
  78. /**
  79. * 绑定2
  80. */
  81. @Bean
  82. Binding bindingExchangeMessage2(Queue topicQueueB, TopicExchange exchange)
  83. {
  84. //将队列和交换机绑定, 并设置用于匹配键:routingKey
  85. return BindingBuilder.bind(topicQueueB).to(exchange).with("topic.routingKey.#");
  86. }
  87. }

(2)创建发送者

在 rabbitmq-provider(消息推送项目)中,创建发送者,通过发送不同的 RoutingKey 来测试效果。代码如下:

  1. package com.pjb;
  2. import com.pjb.config.TopicRabbitMqConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. /**
  8. * RabbitMQ测试类
  9. * @author pan_junbiao
  10. **/
  11. @SpringBootTest
  12. public class TopicRabbitMqTest
  13. {
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. @Test
  17. public void sendTopicMessage1()
  18. {
  19. String context = "pan_junbiao的博客_01";
  20. System.out.println("Sender:" + context);
  21. this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.cc",context);
  22. }
  23. @Test
  24. public void sendTopicMessage2()
  25. {
  26. String context = "pan_junbiao的博客_02";
  27. System.out.println("Sender:" + context);
  28. this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.a",context);
  29. }
  30. @Test
  31. public void sendTopicMessage3()
  32. {
  33. String context = "pan_junbiao的博客_03";
  34. System.out.println("Sender:" + context);
  35. this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.pjb",context);
  36. }
  37. }

(3)创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建接收者A。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 接收者
  7. * @author pan_junbiao
  8. **/
  9. @Component
  10. @RabbitListener(queues = "topic_queue_name_a")
  11. public class TopicReceiverA
  12. {
  13. @RabbitHandler
  14. public void process(String msg)
  15. {
  16. System.out.println("Topic ReceiverA:" + msg);
  17. }
  18. }

在 rabbitmq-consumer(消息接收项目)中,创建接收者B。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 接收者
  7. * @author pan_junbiao
  8. **/
  9. @Component
  10. @RabbitListener(queues = "topic_queue_name_b")
  11. public class TopicReceiverB
  12. {
  13. @RabbitHandler
  14. public void process(String msg)
  15. {
  16. System.out.println("Topic ReceiverB:" + msg);
  17. }
  18. }

分别运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。

执行结果1:

执行结果2:

执行结果3:

 

2.3 实现广播模式

Fanout 类型的交换器可以实现广播模式。在该模式下,绑定了交换器的所有队列都能接收到这个消息。

(1)配置 Fanout 类型

在 rabbitmq-provider(消息推送项目)中,配置广播模式的对象。代码如下:

  1. package com.pjb.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * RabbitMQ配置类
  10. * @author pan_junbiao
  11. **/
  12. @Configuration
  13. public class FanoutRabbitMqConfig
  14. {
  15. public static final String FANOUT_QUEUE_NAME_A = "fanout_queue_name_a"; //队列名称
  16. public static final String FANOUT_QUEUE_NAME_B = "fanout_queue_name_b"; //队列名称
  17. public static final String FANOUT_QUEUE_NAME_C = "fanout_queue_name_c"; //队列名称
  18. public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name"; //交换器名称
  19. @Bean
  20. public Queue fanoutQueueA()
  21. {
  22. return new Queue(FANOUT_QUEUE_NAME_A, true);
  23. }
  24. @Bean
  25. public Queue fanoutQueueB()
  26. {
  27. return new Queue(FANOUT_QUEUE_NAME_B, true);
  28. }
  29. @Bean
  30. public Queue fanoutQueueC()
  31. {
  32. return new Queue(FANOUT_QUEUE_NAME_C, true);
  33. }
  34. @Bean
  35. FanoutExchange fanoutExchange()
  36. {
  37. return new FanoutExchange(FANOUT_EXCHANGE_NAME);
  38. }
  39. @Bean
  40. Binding bindingExchangeA(Queue fanoutQueueA, FanoutExchange fanoutExchange)
  41. {
  42. return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
  43. }
  44. @Bean
  45. Binding bindingExchangeB(Queue fanoutQueueB, FanoutExchange fanoutExchange)
  46. {
  47. return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
  48. }
  49. @Bean
  50. Binding bindingExchangeC(Queue fanoutQueueC, FanoutExchange fanoutExchange)
  51. {
  52. return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);
  53. }
  54. }

(2)创建发送者

在 rabbitmq-provider(消息推送项目)中,创建发送者,代码如下:

  1. package com.pjb;
  2. import com.pjb.config.FanoutRabbitMqConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. /**
  8. * RabbitMQ测试类
  9. * @author pan_junbiao
  10. **/
  11. @SpringBootTest
  12. public class FanoutRabbitMqTest
  13. {
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. @Test
  17. public void sendFanoutMessage()
  18. {
  19. String context = "您好,欢迎访问 pan_junbiao的博客";
  20. System.out.println("Sender:" + context);
  21. this.rabbitTemplate.convertAndSend(FanoutRabbitMqConfig.FANOUT_EXCHANGE_NAME, "", context);
  22. }
  23. }

(3)创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建接收者A。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 接收者
  7. * @author pan_junbiao
  8. **/
  9. @Component
  10. @RabbitListener(queues = "fanout_queue_name_a")
  11. public class FanoutReceiverA
  12. {
  13. @RabbitHandler
  14. public void process(String message)
  15. {
  16. System.out.println("Fanout ReceiverA:" + message);
  17. }
  18. }

在 rabbitmq-consumer(消息接收项目)中,创建接收者B。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 接收者
  7. * @author pan_junbiao
  8. **/
  9. @Component
  10. @RabbitListener(queues = "fanout_queue_name_b")
  11. public class FanoutReceiverB
  12. {
  13. @RabbitHandler
  14. public void process(String message)
  15. {
  16. System.out.println("Fanout ReceiverB:" + message);
  17. }
  18. }

 在 rabbitmq-consumer(消息接收项目)中,创建接收者C。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 接收者
  7. * @author pan_junbiao
  8. **/
  9. @Component
  10. @RabbitListener(queues = "fanout_queue_name_c")
  11. public class FanoutReceiverC
  12. {
  13. @RabbitHandler
  14. public void process(String message)
  15. {
  16. System.out.println("Fanout ReceiverC:" + message);
  17. }
  18. }

运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。

执行结果:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/399212
推荐阅读
相关标签
  

闽ICP备14008679号