赞
踩
真正生产环境都会经过exchange(交换机)来发送消息,而不是直接发送到队列,交换机的类型有以下三种:Fanout:广播;Direct:定向;Topic: 话题。
作用:接收publisher发送的消息;将消息按照规则路由到与之绑定的队列。
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。
例如,在支付业务中,使用广播模式。每个微服务都绑定一个消息队列,用在用户支付成功后,向Fanout交换机发送一条消息,订单服务,积分服务等都会监听各自的消息队列,保证每个服务都可以消费该消息。
案例
1、在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2、在RabbitMQ控制台中,声明交换机hmall.fanqut,将两个队列与其绑定3、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
4、在publisher中编写测试方法,向hmall.fanout发送消息
第一步,发送消息
- @Test
- void testSendFanout(){
- String exchangeName = "hmall.fanout";
- String msg = "hello,everyone";
- rabbitTemplate.convertAndSend(exchangeName,null,msg);
- }
第二部:接收消息
- @RabbitListener(queues = "fanout.queue1")
- public void listFanoutQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到了fanout.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listFanoutQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到了fanout.queue2的消息:【" + msg + "】");
- }
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与RoutingKey一致的队列
案例
1、在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
2、在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
3、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
4、在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
第一步:将队列与交换机绑定,设置不同的 BindingKey(RoutingKey)
第二步:发送消息
- @Test
- void testSendDirect(){
- String exchangeName = "hmall.direct";
- String msg1 = "红色警报!!";
- String msg2 = "蓝色警报!!";
- rabbitTemplate.convertAndSend(exchangeName,"red",msg1);
- rabbitTemplate.convertAndSend(exchangeName,"blue",msg2);
- }
第三步:接收消息
- @RabbitListener(queues = "direct.queue1")
- public void listDirectQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到了direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "direct.queue2")
- public void listDirectQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到了direct.queue2的消息:【" + msg + "】");
- }
Topic Exchange与Direct Exchange类似,区别在于routingKey可以是多个单词的列表,并且以"."分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
案例
1、在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
2、在RabbitMQ控制台中,声明交换机hmall.topic ,将两个队列与其绑定
3、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
4、在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
第一步:将队列与交换机绑定,设置不同的 BindingKey(RoutingKey)
第二步:发送消息
- @Test
- void testSendTopic(){
- String exchangeName = "hmall.topic";
- String msg1 = "新闻!!";
- String msg2 = "中国的新闻!!";
- rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg1);
- rabbitTemplate.convertAndSend(exchangeName,"china.news",msg2);
- }
第三步:接收消息
- @RabbitListener(queues = "topic.queue1")
- public void listTopicQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到了topic.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "topic.queue2")
- public void listTopicQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到了topic.queue2的消息:【" + msg + "】");
- }
Topic交换机接收的消息RoutingKey可以是多个单词,以"."分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
pringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
Queue:用于声明队列,可以用工厂类QueueBuilder构建
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
交换机的实现
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutConfiguration {
- @Bean
- public FanoutExchange fanoutExchange(){
- //ExchangeBuilder.fanoutExchange("hmall.fanout2").build();//通过ExchangeBuilder构建
- return new FanoutExchange("hmall.fanout2");//直接new一个
- }
-
- @Bean
- public Queue fanoutQueue3(){
- //QueueBuilder.durable("fanout.queue3").build();//持久化
- return new Queue("fanout.queue3");
- }
-
- @Bean
- public Queue fanoutQueue4(){
- //QueueBuilder.durable("fanout.queue3").build();//持久化
- return new Queue("fanout.queue4");
- }
- @Bean
- public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
- }
-
- @Bean
- public Binding fanoutBinding4(){
- return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()); //如果bean已经创建,直接得到bean
- }
- }
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class DirectConfiguration {
- @Bean
- public DirectExchange directExchange(){
-
- return new DirectExchange("hmall.direct");//直接new一个
- }
-
- @Bean
- public Queue directQueue1(){
- //QueueBuilder.durable("fanout.queue3").build();//持久化
- return new Queue("direct.queue1");
- }
-
- @Bean
- public Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
- }
- @Bean
- public Binding directQueue1BindingBlue(Queue directQueue1,DirectExchange directExchange){
- return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
- }
-
-
- @Bean
- public Queue directQueue2(){
- //QueueBuilder.durable("fanout.queue3").build();//持久化
- return new Queue("direct.queue2");
- }
-
- @Bean
- public Binding directQueue2BindingRed(Queue directQueue2,DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
- }
- @Bean
- public Binding directQueue2BindingYellow(Queue directQueue2,DirectExchange directExchange){
- return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
- }
-
- }
可以看出声明direct交换机和队列之间的绑定关系代码冗余度很高。因此Spring AMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式。
使用@RabbitListener注解来声明队列和交换机,代码简洁直观。
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue1",durable = "true"),
- exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
- key = {"red","blue"}
- ))
- public void listDirectQueue1(String msg) throws InterruptedException {
- System.out.println("消费者1收到了direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "direct.queue2",durable = "true"),
- exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
- key = {"red","yellow"}
- ))
- public void listDirectQueue2(String msg) throws InterruptedException {
- System.out.println("消费者2收到了direct.queue2的消息:【" + msg + "】");
- }
案例:利用Spring AMQP发送对象类型的消息
声明一个队列,名为object.queue
编写单元测试,向队列中直接发送一条消息,消息类型为Map在控制台查看消息,总结你能发现的问题
Spring的对消息对象的处理是org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。存在下列问题:
1、JDK的序列化有安全风险
2、JDK序列化的消息太大
3、JDK序列化的消息可读性差
为了解决上述问题,采用JSON序列化代替默认的JDK序列化。
第一步:在publisher和consumer中都要引入jackson依赖
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
第二步,在publisher和consumer的启动类中都要配置MessageConverter
- @Bean
- public MessageConverter jacksonMessageConvertor(){
- return new Jackson2JsonMessageConverter();
- }
第三步,发送消息
- @Test
- void testSendObject(){
- Map<String, Object> msg = new HashMap<>(2);
- msg.put("name","jack");
- msg.put("age",21);
- rabbitTemplate.convertAndSend("object.queue",msg);
- }
第四步,接收消息
- @RabbitListener(queues = "object.queue")
- public void listenObject(Map<String, Object> msg) throws InterruptedException {
- System.out.println("消费者1收到了object.queue1的消息:【" + msg + "】");
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。