当前位置:   article > 正文

RabbitMQ基础(三)之交换机_fanout 交换机

fanout 交换机

真正生产环境都会经过exchange(交换机)来发送消息,而不是直接发送到队列,交换机的类型有以下三种:Fanout:广播Direct:定向Topic: 话题

作用:接收publisher发送的消息;将消息按照规则路由到与之绑定的队列。

 一、Fanout交换机

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。

例如,在支付业务中,使用广播模式。每个微服务都绑定一个消息队列,用在用户支付成功后,向Fanout交换机发送一条消息,订单服务,积分服务等都会监听各自的消息队列,保证每个服务都可以消费该消息。

案例

1、在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
2、在RabbitMQ控制台中,声明交换机hmall.fanqut,将两个队列与其绑定

3、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4、在publisher中编写测试方法,向hmall.fanout发送消息

第一步,发送消息

  1. @Test
  2. void testSendFanout(){
  3. String exchangeName = "hmall.fanout";
  4. String msg = "hello,everyone";
  5. rabbitTemplate.convertAndSend(exchangeName,null,msg);
  6. }

第二部:接收消息

  1. @RabbitListener(queues = "fanout.queue1")
  2. public void listFanoutQueue1(String msg) throws InterruptedException {
  3. System.out.println("消费者1收到了fanout.queue1的消息:【" + msg + "】");
  4. }
  5. @RabbitListener(queues = "fanout.queue2")
  6. public void listFanoutQueue2(String msg) throws InterruptedException {
  7. System.out.println("消费者2收到了fanout.queue2的消息:【" + msg + "】");
  8. }

 二、Direct交换机

 Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与RoutingKey一致的队列

案例

1、在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

2、在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定

3、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

4、在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息

第一步:将队列与交换机绑定,设置不同的 BindingKey(RoutingKey)

 

 第二步:发送消息

  1. @Test
  2. void testSendDirect(){
  3. String exchangeName = "hmall.direct";
  4. String msg1 = "红色警报!!";
  5. String msg2 = "蓝色警报!!";
  6. rabbitTemplate.convertAndSend(exchangeName,"red",msg1);
  7. rabbitTemplate.convertAndSend(exchangeName,"blue",msg2);
  8. }

第三步:接收消息

  1. @RabbitListener(queues = "direct.queue1")
  2. public void listDirectQueue1(String msg) throws InterruptedException {
  3. System.out.println("消费者1收到了direct.queue1的消息:【" + msg + "】");
  4. }
  5. @RabbitListener(queues = "direct.queue2")
  6. public void listDirectQueue2(String msg) throws InterruptedException {
  7. System.out.println("消费者2收到了direct.queue2的消息:【" + msg + "】");
  8. }

 三、Topic交换机

Topic Exchange与Direct Exchange类似,区别在于routingKey可以是多个单词的列表,并且以"."分割

Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

案例

1、在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2、在RabbitMQ控制台中,声明交换机hmall.topic ,将两个队列与其绑定

3、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4、在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

 第一步:将队列与交换机绑定,设置不同的 BindingKey(RoutingKey)

 第二步:发送消息

  1. @Test
  2. void testSendTopic(){
  3. String exchangeName = "hmall.topic";
  4. String msg1 = "新闻!!";
  5. String msg2 = "中国的新闻!!";
  6. rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg1);
  7. rabbitTemplate.convertAndSend(exchangeName,"china.news",msg2);
  8. }

第三步:接收消息

  1. @RabbitListener(queues = "topic.queue1")
  2. public void listTopicQueue1(String msg) throws InterruptedException {
  3. System.out.println("消费者1收到了topic.queue1的消息:【" + msg + "】");
  4. }
  5. @RabbitListener(queues = "topic.queue2")
  6. public void listTopicQueue2(String msg) throws InterruptedException {
  7. System.out.println("消费者2收到了topic.queue2的消息:【" + msg + "】");
  8. }

四、Direct交换机和Topic交换机的差异

Topic交换机接收的消息RoutingKey可以是多个单词,以"."分割

Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词

 五、用Java代码声明队列交换机

 pringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

Queue:用于声明队列,可以用工厂类QueueBuilder构建

Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

 交换机的实现

5.1、基于Bean声明Fanout交换机和队列 

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class FanoutConfiguration {
  6. @Bean
  7. public FanoutExchange fanoutExchange(){
  8. //ExchangeBuilder.fanoutExchange("hmall.fanout2").build();//通过ExchangeBuilder构建
  9. return new FanoutExchange("hmall.fanout2");//直接new一个
  10. }
  11. @Bean
  12. public Queue fanoutQueue3(){
  13. //QueueBuilder.durable("fanout.queue3").build();//持久化
  14. return new Queue("fanout.queue3");
  15. }
  16. @Bean
  17. public Queue fanoutQueue4(){
  18. //QueueBuilder.durable("fanout.queue3").build();//持久化
  19. return new Queue("fanout.queue4");
  20. }
  21. @Bean
  22. public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
  23. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
  24. }
  25. @Bean
  26. public Binding fanoutBinding4(){
  27. return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()); //如果bean已经创建,直接得到bean
  28. }
  29. }

 5.2、基于Bean声明Direct交换机和队列 

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class DirectConfiguration {
  6. @Bean
  7. public DirectExchange directExchange(){
  8. return new DirectExchange("hmall.direct");//直接new一个
  9. }
  10. @Bean
  11. public Queue directQueue1(){
  12. //QueueBuilder.durable("fanout.queue3").build();//持久化
  13. return new Queue("direct.queue1");
  14. }
  15. @Bean
  16. public Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){
  17. return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
  18. }
  19. @Bean
  20. public Binding directQueue1BindingBlue(Queue directQueue1,DirectExchange directExchange){
  21. return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
  22. }
  23. @Bean
  24. public Queue directQueue2(){
  25. //QueueBuilder.durable("fanout.queue3").build();//持久化
  26. return new Queue("direct.queue2");
  27. }
  28. @Bean
  29. public Binding directQueue2BindingRed(Queue directQueue2,DirectExchange directExchange){
  30. return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
  31. }
  32. @Bean
  33. public Binding directQueue2BindingYellow(Queue directQueue2,DirectExchange directExchange){
  34. return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
  35. }
  36. }

 可以看出声明direct交换机和队列之间的绑定关系代码冗余度很高。因此Spring AMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式。

 5.3、基于@RabbitListener注解来声明队列和交换机

 使用@RabbitListener注解来声明队列和交换机,代码简洁直观。

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "direct.queue1",durable = "true"),
  3. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  4. key = {"red","blue"}
  5. ))
  6. public void listDirectQueue1(String msg) throws InterruptedException {
  7. System.out.println("消费者1收到了direct.queue1的消息:【" + msg + "】");
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10. value = @Queue(name = "direct.queue2",durable = "true"),
  11. exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  12. key = {"red","yellow"}
  13. ))
  14. public void listDirectQueue2(String msg) throws InterruptedException {
  15. System.out.println("消费者2收到了direct.queue2的消息:【" + msg + "】");
  16. }

六、消息转换器

案例:利用Spring AMQP发送对象类型的消息

声明一个队列,名为object.queue
编写单元测试,向队列中直接发送一条消息,消息类型为Map

在控制台查看消息,总结你能发现的问题

Spring的对消息对象的处理是org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。存在下列问题:
1、JDK的序列化有安全风险
2、JDK序列化的消息太大
3、JDK序列化的消息可读性差

为了解决上述问题,采用JSON序列化代替默认的JDK序列化。

第一步:在publisher和consumer中都要引入jackson依赖

  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. </dependency>

第二步,在publisher和consumer的启动类中都要配置MessageConverter

  1. @Bean
  2. public MessageConverter jacksonMessageConvertor(){
  3. return new Jackson2JsonMessageConverter();
  4. }

第三步,发送消息

  1. @Test
  2. void testSendObject(){
  3. Map<String, Object> msg = new HashMap<>(2);
  4. msg.put("name","jack");
  5. msg.put("age",21);
  6. rabbitTemplate.convertAndSend("object.queue",msg);
  7. }

第四步,接收消息

  1. @RabbitListener(queues = "object.queue")
  2. public void listenObject(Map<String, Object> msg) throws InterruptedException {
  3. System.out.println("消费者1收到了object.queue1的消息:【" + msg + "】");
  4. }

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号