当前位置:   article > 正文

RabbitMQ之交换机_rabbitmq交换机

rabbitmq交换机

目录

一、RabbitMQ交换机

1、交换机的由来

2、交换机类型​

2.1直连交换机(Direct  Exchange)

​2.2主题交换机(Topic  Exchange) 

​2.3扇形交换机(Fanout  Exchange)  ​

2.4首部交换机(Headers  Exchange)

2.5默认交换机(Default Exchange) 

二、RabbitMQ交换机实例讲解

        一、直连交换机讲解

                  1、先在生产者中创建一个直连交换机配置类

                  2、之后创建一个控制类,用来发信息

                  3、在消费者中定义好接受者

        二、主题交换机讲解

                  1、先在生产者中创建一个直连交换机配置类

                  2、之后创建一个控制类,用来发信息

                  3、在消费者中定义好接受者:

        三、扇形(广播)交换机讲解

                  1、先在生产者中创建一个直连交换机配置类

                  2、之后创建一个控制类,用来发信息

                  3、在消费者中定义好接受者:


一、RabbitMQ交换机

1、交换机的由来

在RabbitMQ中,生产者发送信息不会直接将消息投递到队列中,而是将消息投递到交换机中,再由交换机转发到具体的队列中,队列再将消息以推送或者拉取方式给消费进行消费

在交换机诞生了两个概念

1、路由键:

 2、绑定键:

 3、两者中的关系

2、交换机类型

2.1直连交换机(Direct  Exchange)

如图所示:

2.2主题交换机(Topic  Exchange) 

2.3扇形交换机(Fanout  Exchange)  

2.4首部交换机(Headers  Exchange)

2.5默认交换机(Default Exchange) 

二、RabbitMQ交换机实例讲解

        一、直连交换机讲解

                  1、先在生产者中创建一个直连交换机配置

DirectQueueConfig:生成队列,交换机,以及路由键,定义三个队列
  1. package com.zj.provider;
  2. import lombok.With;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.DirectExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. @SuppressWarnings("all")
  11. public class DirectQueueConfig {
  12. /**
  13. * 生成一个队列
  14. * @return
  15. */
  16. @Bean
  17. public Queue directQueueA(){
  18. return new Queue("directQueueA",true);
  19. }
  20. @Bean
  21. public Queue directQueueB(){
  22. return new Queue("directQueueB",true);
  23. }
  24. @Bean
  25. public Queue directQueueC(){
  26. return new Queue("directQueueC",true);
  27. }
  28. @Bean
  29. public DirectExchange directExchange(){
  30. return new DirectExchange("directExchange");
  31. }
  32. @Bean
  33. public Binding bindingA(){
  34. return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
  35. }
  36. @Bean
  37. public Binding bindingB(){
  38. return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
  39. }
  40. @Bean
  41. public Binding bindingC(){
  42. return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
  43. }
  44. }

                  2、之后创建一个控制类,用来发信息

DirectController:其中rabbitTemplate用来发送信息辅助类

  1. package com.zj.provider;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.util.Map;
  8. @RestController
  9. @RequestMapping("/sendDirect")
  10. @SuppressWarnings("all")
  11. public class DirectController {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @RequestMapping("/sendDirect")
  15. public String sendDirect(String routerKey) {
  16. rabbitTemplate.convertAndSend("directExchange", routerKey, "Hello world");
  17. return "yes";
  18. }
  19. }

                  3、在消费者中定义好接受者

DirectReciverA:再生成连个同样的类但是要注意的是必须要打@RabbitHandler和@RabbitListener(queues = "directQueueA")第一个是对队列处理者,第二个是队列的监听者,监听队列,不加第一个注解,消息将会接收不到
  1. package com.zj.consumer.mq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @SuppressWarnings("all")
  8. @Slf4j
  9. @RabbitListener(queues = "directQueueA")
  10. public class DirectReciverA {
  11. @RabbitHandler
  12. public void process(String message){
  13. log.warn("A接收到了"+message);
  14. }
  15. }

 结果运行成功:

        二、主题交换机讲解

                  1、先在生产者中创建一个直连交换机配置类

TopicQueueConfig:注意:这里面需要特定指定键

注意:必须在绑定键前加一个Topic来区分,必须介以区别,不然将会报错,因为加入了bean对象

  1. package com.zj.provider.MQ;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. @SuppressWarnings("all")
  10. public class TopicQueueConfig {
  11. private final static String KEY_A="*.orange.*";
  12. private final static String KEY_B="*.*.rabbit";
  13. private final static String KEY_C="lazy.#";
  14. /**
  15. * 生成一个队列
  16. * @return
  17. */
  18. @Bean
  19. public Queue topicQueueA(){
  20. return new Queue("topicQueueA",true);
  21. }
  22. @Bean
  23. public Queue topicQueueB(){
  24. return new Queue("topicQueueB",true);
  25. }
  26. @Bean
  27. public Queue topicQueueC(){
  28. return new Queue("topicQueueC",true);
  29. }
  30. @Bean
  31. public TopicExchange topicExchange(){
  32. return new TopicExchange("topicExchange");
  33. }
  34. @Bean
  35. public Binding topicbindingA(){
  36. return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
  37. }
  38. @Bean
  39. public Binding topicbindingB(){
  40. return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
  41. }
  42. @Bean
  43. public Binding topicbindingC(){
  44. return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
  45. }
  46. }

                  2、之后创建一个控制类,用来发信息

  1. @RequestMapping("/sendTopic")
  2. public String sendTopic(String routerKey) {
  3. rabbitTemplate.convertAndSend("topicExchange", routerKey, "Hello world");
  4. return "yes";
  5. }

                  3、在消费者中定义好接受者:

  1. package com.zj.consumer.mq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @SuppressWarnings("all")
  8. @Slf4j
  9. @RabbitListener(queues = "topicQueueA")
  10. public class TopicReciverA {
  11. @RabbitHandler
  12. public void process(String message){
  13. log.info("A接收到了"+message);
  14. }
  15. }

注意:需要进行发信息才能在RabbitMQ发现队列

 显示出队列:

 接收成功:

        三、扇形(广播)交换机讲解

 扇形交换机和其他两个交换机不一样,扇形交换机不用绑定键,因为他会进行广播,同样的在队列与交换机进行绑定时,需要加上不同的名字来进行区分

                  1、先在生产者中创建一个直连交换机配置类

  1. package com.zj.provider.MQ;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. @SuppressWarnings("all")
  10. public class FanoutQueueConfig {
  11. /**
  12. * 生成一个队列
  13. * @return
  14. */
  15. @Bean
  16. public Queue fanoutQueueA(){
  17. return new Queue("fanoutQueueA",true);
  18. }
  19. @Bean
  20. public Queue fanoutQueueB(){
  21. return new Queue("fanoutQueueB",true);
  22. }
  23. @Bean
  24. public Queue fanoutQueueC(){
  25. return new Queue("fanoutQueueC",true);
  26. }
  27. @Bean
  28. public FanoutExchange fanoutExchange(){
  29. return new FanoutExchange("fanoutExchange");
  30. }
  31. @Bean
  32. public Binding fanoutbindingA(){
  33. return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
  34. }
  35. @Bean
  36. public Binding fanoutbindingB(){
  37. return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
  38. }
  39. @Bean
  40. public Binding fanoutbindingC(){
  41. return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
  42. }
  43. }

                  2、之后创建一个控制类,用来发信息

没有绑定键,但是要写空值,不然fanoutExchange会被认为是路由键
@RequestMapping("/sendFanout")
public String sendFanout() {
    rabbitTemplate.convertAndSend("fanoutExchange", "null" ,"Hello world");
    return "yes";
}

                  3、在消费者中定义好接受者:

  1. package com.zj.consumer.mq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @SuppressWarnings("all")
  8. @Slf4j
  9. @RabbitListener(queues = "fanoutQueueA")
  10. public class FanoutReciverA {
  11. @RabbitHandler
  12. public void process(String message){
  13. log.info("A接收到了"+message);
  14. }
  15. }

 生产者运行效果:

 消费者接收到信息

 今天的知识就分享到这了,希望能够帮助到你! 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/541039
推荐阅读
相关标签
  

闽ICP备14008679号