赞
踩
RabbitMQ 常用的交换器类型有 Direct、Topic、Fanout、Headers 这四种。AMQP 协议里还提到另外两种类型:System 和自定义,这里不予描述。对于这四种类型下面一一阐述。
Direct 类型的交换器由路由规则很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。
Direct Exchange 是 RabbitMQ 默认的交换器模式,也是最简单的模式。它根据 RoutingKey 完全匹配去寻找队列。
上面讲到 Direct 类型的交换器由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。Topic 类型的交换器在匹配规则上进行了扩展,它与 Direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
(1)RoutingKey 为一个点号 “.” 分隔的字符串(被点号 “.” 分隔开的每一段独立的字符串称为一个单词),如:com.rabbitmq.client、java.util.concurrent、com.hidden.client;
(2)BindingKey 和 RoutingKey 一样也是点号 “.” 分隔的字符串;
(3)BindingKey 中可以存在两种特殊字符串星号 “*” 和井号 “#”,用于做模糊匹配,其中星号 “*” 用于匹配一个单词,井号 “#”用于匹配多个规则单词(0个或者多个单词);
消息广播的模式,即将消息广播到所有绑定到它的队列中,而不考虑 RoutingKey 的值(不管路键或是路由模式)。如果设置了 RoutingKey ,则 RoutingKey 依然被忽略。
Headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。Headers 类型的交换器性能会很差,而且不实用,基本上不会看到它的存在。
下面将通过示例来讲解 RabbitMQ 的发送/接收模式。首先需要创建两个 SpringBoot 项目并整合 RabbitMQ 客户端。
(1)创建第一个 SpringBoot 项目( rabbitmq-provider 消息推送项目),项目结构如下图:
在pom.xml配置信息文件中,添加相关依赖文件:
- <!-- AMQP客户端 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.4.1</version>
- </dependency>
在 application.yml 配置文件中配置 RabbitMQ 服务:
- spring:
- # 项目名称
- application:
- name: rabbitmq-provider
- # RabbitMQ服务配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
(2)创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目),项目结构如下图:
在pom.xml配置信息文件中,添加相关依赖文件:
- <!-- AMQP客户端 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>2.4.1</version>
- </dependency>
在 application.yml 配置文件中配置 RabbitMQ 服务:
- spring:
- # 项目名称
- application:
- name: rabbitmq-consumer
- # RabbitMQ服务配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
使用 Direct 模式,实现消息的发送和接收队列。
(1)配置队列
在 rabbitmq-provider(消息推送项目)中,配置队列名称,并将队列交由 IoC 管理,代码如下:
- package com.pjb.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * RabbitMQ配置类
- * @author pan_junbiao
- **/
- @Configuration
- public class DirectRabbitMqConfig
- {
- public static final String DIRECT_QUEUE_NAME = "direct_queue_name"; //队列名称
- public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; //交换器名称
- public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键
-
- /**
- * 队列
- */
- @Bean
- public Queue directQueue()
- {
- /**
- * 创建队列,参数说明:
- * String name:队列名称。
- * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
- * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
- * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
- * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
- * 当没有生产者或者消费者使用此队列,该队列会自动删除。
- * Map<String, Object> arguments:设置队列的其他一些参数。
- */
- return new Queue(DIRECT_QUEUE_NAME, true, false, false, null);
- }
-
- /**
- * Direct交换器
- */
- @Bean
- public DirectExchange directExchange()
- {
- /**
- * 创建交换器,参数说明:
- * String name:交换器名称
- * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
- * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
- * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
- */
- return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
- }
-
- /**
- * 绑定
- */
- @Bean
- Binding bindingDirect(DirectExchange directExchange,Queue directQueue)
- {
- //将队列和交换机绑定, 并设置用于匹配键:routingKey
- return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
- }
- }
(2)创建发送者
在 rabbitmq-provider(消息推送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息,代码如下:
- package com.pjb;
-
- import com.pjb.config.DirectRabbitMqConfig;
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * RabbitMQ测试类
- * @author pan_junbiao
- **/
- @SpringBootTest
- public class DirectRabbitMqTest
- {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Test
- public void sendDirectMessage()
- {
- //创建用户信息
- Map<String, Object> userMap = new HashMap<>();
- userMap.put("userId", "1");
- userMap.put("userName", "pan_junbiao的博客");
- userMap.put("blogUrl", "https://blog.csdn.net/pan_junbiao");
- userMap.put("userRemark", "您好,欢迎访问 pan_junbiao的博客");
-
- /**
- * 发送消息,参数说明:
- * String exchange:交换器名称。
- * String routingKey:路由键。
- * Object object:发送内容。
- */
- rabbitTemplate.convertAndSend(DirectRabbitMqConfig.DIRECT_EXCHANGE_NAME, DirectRabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
- System.out.println("消息发送成功!");
- }
- }
(3)创建接收者
在 rabbitmq-consumer(消息接收项目)中,创建创建接收者,注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.util.Map;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues="direct_queue_name")
- public class DirectReceiver
- {
- @RabbitHandler
- public void process(Map message)
- {
- System.out.println("接收者收到消息:");
- System.out.println("用户编号:" + message.get("userId"));
- System.out.println("用户名称:" + message.get("userName"));
- System.out.println("博客地址:" + message.get("blogUrl"));
- System.out.println("博客信息:" + message.get("userRemark"));
- }
- }
运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。
执行结果:
Topic 模式是 RabbitMQ 中最灵活的一种模式,可以根据 RoutingKey 自由地绑定不同的队列。
(1)配置 Topic 模式
在 rabbitmq-provider(消息推送项目)中,配置处理消息的队列,代码如下:
- package com.pjb.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * RabbitMQ配置类
- * @author pan_junbiao
- **/
- @Configuration
- public class TopicRabbitMqConfig
- {
- public static final String TOPIC_QUEUE_NAME_A = "topic_queue_name_a"; //队列名称A
- public static final String TOPIC_QUEUE_NAME_B = "topic_queue_name_b"; //队列名称B
- public static final String TOPIC_EXCHANGE_NAME = "topic_exchange_name"; //交换器名称
-
- /**
- * 队列A
- */
- @Bean
- public Queue topicQueueA()
- {
- /**
- * 创建队列,参数说明:
- * String name:队列名称。
- * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
- * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
- * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
- * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
- * 当没有生产者或者消费者使用此队列,该队列会自动删除。
- * Map<String, Object> arguments:设置队列的其他一些参数。
- */
- return new Queue(TOPIC_QUEUE_NAME_A, true);
- }
-
- /**
- * 队列B
- */
- @Bean
- public Queue topicQueueB()
- {
- /**
- * 创建队列,参数说明:
- * String name:队列名称。
- * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
- * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
- * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
- * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
- * 当没有生产者或者消费者使用此队列,该队列会自动删除。
- * Map<String, Object> arguments:设置队列的其他一些参数。
- */
- return new Queue(TOPIC_QUEUE_NAME_B, true);
- }
-
- /**
- * Topic交换器
- */
- @Bean
- TopicExchange exchange()
- {
- /**
- * 创建交换器,参数说明:
- * String name:交换器名称
- * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
- * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
- * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
- */
- return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
- }
-
- /**
- * 绑定1
- */
- @Bean
- Binding bindingExchangeMessage1(Queue topicQueueA, TopicExchange exchange)
- {
- //将队列和交换机绑定, 并设置用于匹配键:routingKey
- return BindingBuilder.bind(topicQueueA).to(exchange).with("topic.routingKey.a");
- }
-
- /**
- * 绑定2
- */
- @Bean
- Binding bindingExchangeMessage2(Queue topicQueueB, TopicExchange exchange)
- {
- //将队列和交换机绑定, 并设置用于匹配键:routingKey
- return BindingBuilder.bind(topicQueueB).to(exchange).with("topic.routingKey.#");
- }
- }
(2)创建发送者
在 rabbitmq-provider(消息推送项目)中,创建发送者,通过发送不同的 RoutingKey 来测试效果。代码如下:
- package com.pjb;
-
- import com.pjb.config.TopicRabbitMqConfig;
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- /**
- * RabbitMQ测试类
- * @author pan_junbiao
- **/
- @SpringBootTest
- public class TopicRabbitMqTest
- {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void sendTopicMessage1()
- {
- String context = "pan_junbiao的博客_01";
- System.out.println("Sender:" + context);
- this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.cc",context);
- }
-
- @Test
- public void sendTopicMessage2()
- {
- String context = "pan_junbiao的博客_02";
- System.out.println("Sender:" + context);
- this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.a",context);
- }
-
- @Test
- public void sendTopicMessage3()
- {
- String context = "pan_junbiao的博客_03";
- System.out.println("Sender:" + context);
- this.rabbitTemplate.convertAndSend(TopicRabbitMqConfig.TOPIC_EXCHANGE_NAME,"topic.routingKey.pjb",context);
- }
- }
(3)创建接收者
在 rabbitmq-consumer(消息接收项目)中,创建接收者A。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues = "topic_queue_name_a")
- public class TopicReceiverA
- {
- @RabbitHandler
- public void process(String msg)
- {
- System.out.println("Topic ReceiverA:" + msg);
- }
- }
在 rabbitmq-consumer(消息接收项目)中,创建接收者B。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues = "topic_queue_name_b")
- public class TopicReceiverB
- {
- @RabbitHandler
- public void process(String msg)
- {
- System.out.println("Topic ReceiverB:" + msg);
- }
- }
分别运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。
执行结果1:
执行结果2:
执行结果3:
Fanout 类型的交换器可以实现广播模式。在该模式下,绑定了交换器的所有队列都能接收到这个消息。
(1)配置 Fanout 类型
在 rabbitmq-provider(消息推送项目)中,配置广播模式的对象。代码如下:
- package com.pjb.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * RabbitMQ配置类
- * @author pan_junbiao
- **/
- @Configuration
- public class FanoutRabbitMqConfig
- {
- public static final String FANOUT_QUEUE_NAME_A = "fanout_queue_name_a"; //队列名称
- public static final String FANOUT_QUEUE_NAME_B = "fanout_queue_name_b"; //队列名称
- public static final String FANOUT_QUEUE_NAME_C = "fanout_queue_name_c"; //队列名称
- public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange_name"; //交换器名称
-
- @Bean
- public Queue fanoutQueueA()
- {
- return new Queue(FANOUT_QUEUE_NAME_A, true);
- }
-
- @Bean
- public Queue fanoutQueueB()
- {
- return new Queue(FANOUT_QUEUE_NAME_B, true);
- }
-
- @Bean
- public Queue fanoutQueueC()
- {
- return new Queue(FANOUT_QUEUE_NAME_C, true);
- }
-
- @Bean
- FanoutExchange fanoutExchange()
- {
- return new FanoutExchange(FANOUT_EXCHANGE_NAME);
- }
-
- @Bean
- Binding bindingExchangeA(Queue fanoutQueueA, FanoutExchange fanoutExchange)
- {
- return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
- }
-
- @Bean
- Binding bindingExchangeB(Queue fanoutQueueB, FanoutExchange fanoutExchange)
- {
- return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
- }
-
- @Bean
- Binding bindingExchangeC(Queue fanoutQueueC, FanoutExchange fanoutExchange)
- {
- return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);
- }
- }
(2)创建发送者
在 rabbitmq-provider(消息推送项目)中,创建发送者,代码如下:
- package com.pjb;
-
- import com.pjb.config.FanoutRabbitMqConfig;
- import org.junit.jupiter.api.Test;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
-
- /**
- * RabbitMQ测试类
- * @author pan_junbiao
- **/
- @SpringBootTest
- public class FanoutRabbitMqTest
- {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Test
- public void sendFanoutMessage()
- {
- String context = "您好,欢迎访问 pan_junbiao的博客";
- System.out.println("Sender:" + context);
- this.rabbitTemplate.convertAndSend(FanoutRabbitMqConfig.FANOUT_EXCHANGE_NAME, "", context);
- }
- }
(3)创建接收者
在 rabbitmq-consumer(消息接收项目)中,创建接收者A。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues = "fanout_queue_name_a")
- public class FanoutReceiverA
- {
- @RabbitHandler
- public void process(String message)
- {
- System.out.println("Fanout ReceiverA:" + message);
- }
- }
在 rabbitmq-consumer(消息接收项目)中,创建接收者B。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues = "fanout_queue_name_b")
- public class FanoutReceiverB
- {
- @RabbitHandler
- public void process(String message)
- {
- System.out.println("Fanout ReceiverB:" + message);
- }
- }
在 rabbitmq-consumer(消息接收项目)中,创建接收者C。代码如下:
- package com.pjb.receiver;
-
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 接收者
- * @author pan_junbiao
- **/
- @Component
- @RabbitListener(queues = "fanout_queue_name_c")
- public class FanoutReceiverC
- {
- @RabbitHandler
- public void process(String message)
- {
- System.out.println("Fanout ReceiverC:" + message);
- }
- }
运行 rabbitmq-provider(消息推送项目)中的发送方法,然后运行 rabbitmq-consumer(消息接收项目),将从控制台中看到执行结果。
执行结果:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。