赞
踩
2.实战:RabbitMQ四种消息交换机类型
2.1 Direct Exchange
2.2 Fanout Exchange
2.3 Topic Exchange
2.4 Header Exchange
3.总结
RabbitMQ安装不必我多说,linux和windows环境下都可以安装,我是在windows环境下进行安装的,安装RabbitMQ之前需要先安装Erlang,因为Rabbit MQ 是建立在Erlang OTP平台上,而且安装Erlang 时要注意安装的RabbityMQ 所依赖的Erlang版本。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而且支持跨平台。
下图主要介绍了消息的推送和接收的过程:
生产者:创建需要发送的消息
queues:存储消息的队列
交换机:将消息转发到队列上
消费者:以监听的方式获取对应的消息
(1)创建spring boot项目,引入web和rabbitmq依赖。
pom.xml如下
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
(2)在application.properties文件中配置rabbitMq的基本信息,数据传输的默认端口号是5672
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
创建基于direct交换机的rabbitmq配置类。
@Configuration public class RabbitDirectConfig { public final static String DIRECTNAME = "direct"; //注入队列 @Bean Queue queue() { return new Queue("hello.direct"); } //注入交换机 @Bean DirectExchange directExchange() { //durable:重启后是否有效 autodelete:长期未使用是否删除掉 return new DirectExchange(DIRECTNAME, true, false); } //将队列和交换机进行绑定 @Bean Binding binding() { return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); } }
创建receiver类接收消息
@Component
public class DirectReceiver {
@RabbitListener(queues = "hello.direct")
public void handler1(String msg){
System.out.println("handler1====="+msg);
}
}
编写测试类
//注入Rabbitmq发送消息的模板对象
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//routingKey需要转发到对应队列的名字 object 消息对象
rabbitTemplate.convertAndSend("hello.direct", "hello fff!haa");
}
启动springboot项目,然后运行测试类进行消息的发送,receiver会将收到的消息打印在控制台
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
创建基于Fanout交换机的rabbitmq配置类。
@Configuration public class RabbitFanoutConfig { //创建两个队列并绑定到fanout交换机上 public static final String FANOUTNAME = "fanout"; @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUTNAME, true, false); } @Bean Binding bindingOne() { return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); } }
创建receiver类接收消息
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String msg) {
System.out.println("FanoutReceiver:handler1:" + msg);
}
@RabbitListener(queues = "queue-two")
public void handler2(String msg) {
System.out.println("FanoutReceiver:handler2:" + msg);
}
}
编写测试类,发送一条消息,fanoutexchange会将此消息转发给绑定到它身上的所有队列
@Test
public void test1() {
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout!");
}
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
创建基于Topic交换机的rabbitmq配置类。
@Configuration public class RabbitTopicConfig { //创建三个队列,绑定交换机时可以规定通配规则 public static final String TOPICNAME = "topic"; @Bean TopicExchange topicExchange() { return new TopicExchange(TOPICNAME, true, false); } @Bean Queue xiaomi() { return new Queue("xiaomi"); } @Bean Queue huawei() { return new Queue("huawei"); } @Bean Queue phone() { return new Queue("phone"); } @Bean Binding xiaomiBinding() { return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#"); } @Bean Binding huaweiBinding() { return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding phoneBinding() { return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); } }
创建receiver类接收消息
@Component public class TopicReceiver { @RabbitListener(queues = "xiaomi") public void handler1(String msg) { System.out.println("TopicReceiver:handler1:" + msg); } @RabbitListener(queues = "huawei") public void handler2(String msg) { System.out.println("TopicReceiver:handler2:" + msg); } @RabbitListener(queues = "phone") public void handler3(String msg) { System.out.println("TopicReceiver:handler3:" + msg); } }
编写测试类
@Test
public void test2() {
//匹配到xiaomi的消息队列
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news", "小米新闻");
//匹配到phone的消息队列
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "vivo.phone", "vivo 手机");
//匹配到huawei和phone的消息队列
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "huawei.phone", "华为手机");
}
前3条信息打印在项目的控制台,后一条信息打印在测试的控制台,共有4条信息被打印
header交换机取消了routingkey,使用header中的key/value来匹配队列
创建基于Header交换机的rabbitmq配置类。
@Configuration public class RabbitHeaderConfig { public static final String HEADERNAME = "header"; @Bean HeadersExchange headersExchange() { return new HeadersExchange(HEADERNAME, true, false); } @Bean Queue queueName() { return new Queue("name-queue"); } @Bean Queue queueAge() { return new Queue("age-queue"); } @Bean Binding bindingName() { Map<String, Object> map = new HashMap<>(); map.put("name", "李白"); //whereAny 只要匹配到map中任意一个key-value即可 ,扩展:whereAll必须具有map中所有的key-value return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge() { //where 只要age这个key存在即可,value不论取何值 return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists(); } }
创建receiver类接收消息
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:handler1:" + new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:handler2:" + new String(msg, 0, msg.length));
}
}
编写测试类
@Test
public void test3() {
//创建org.springframework.amqp.core.Message,并添加消息的头部
Message nameMsg = MessageBuilder.withBody("hello javaboy !".getBytes()).setHeader("name","李白").build();
Message ageMsg = MessageBuilder.withBody("hello 99 !".getBytes()).setHeader("age","99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
}
至此,rabbitMq四种交换机类型全部介绍完了。
springboot整合rabbitMq也算入了个门,但是rabbitMq的应用场景比如异步、解耦等还需要在以后的项目开发中探索!!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。