当前位置:   article > 正文

RabbitMQ快速上手以及RabbitMQ交换机的四种模式_rabbitmq队列绑定交换机

rabbitmq队列绑定交换机

Win10安装:

​win10下安装 RabbitMQ​_柚几哥哥的博客-CSDN博客

Linux安装:

Linux下载安装 RabbitMQ​_柚几哥哥的博客-CSDN博客

一、基础使用

1、导入依赖

  1. <!--RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.amqp</groupId>
  8. <artifactId>spring-rabbit-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>

2、配置application.yml

  1. spring:
  2.  #RabbitMQ
  3. rabbitmq:
  4.    #服务器地址
  5.   host: 192.168.10.100
  6.    #用户名
  7.   username: guest
  8.    #密码
  9.   password: guest
  10.    #虚拟主机
  11.   virtual-host: /
  12.    #端口
  13.   port: 5672
  14.   listener:
  15.     simple:
  16.        #消费者最小数量
  17.       concurrency: 10
  18.        #消费者最大数量
  19.       max-concurrency: 10
  20.        #限制消费者每次只处理一条消息,处理完再继续下一条消息
  21.       prefetch: 1
  22.        #启动时是否默认启动容器,默认true
  23.       auto-startup: true
  24.        #被拒绝时重新进入队列
  25.       default-requeue-rejected: true
  26.   template:
  27.     retry:
  28.        #发布重试,默认false
  29.       enabled: true
  30.        #重试时间 默认1000ms
  31.       initial-interval: 1000
  32.        #重试最大次数,默认3次
  33.       max-attempts: 3
  34.        #重试最大间隔时间,默认10000ms
  35.       max-interval: 10000
  36.        #重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s
  37.       multiplier: 1.0

3、编写配置类RabbitMQConfig.java

  1. package com.xxxx.seckill.config;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @author zhoubin
  7. * @since 1.0.0
  8. */
  9. @Configuration
  10. public class RabbitMQConfig {
  11.   @Bean
  12.   public Queue queue(){
  13.      return new Queue("queue",true);
  14.   }
  15. }

4、编写发送者MQSender.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @author zhoubin
  8. * @since 1.0.0
  9. */
  10. @Service
  11. @Slf4j
  12. public class MQSender {
  13.   @Autowired
  14.   private RabbitTemplate rabbitTemplate;
  15.   public void send(Object msg) {
  16.      log.info("发送消息:"+msg);
  17.      rabbitTemplate.convertAndSend("queue", msg);
  18.   }
  19. }

5、编写接收者MQReceiver.java

  1. /**
  2. * @author zyw
  3. * @since 1.0.0
  4. */
  5. @Service
  6. @Slf4j
  7. public class MQReceiver {
  8. @RabbitListener(queues = "queue")
  9. public void receive(Object msg) {
  10. log.info("接受消息:" + msg);
  11. }
  12. }

6、编写测试接口UserController.java

  1. /**
  2. * 测试发送RabbitMQ消息
  3. */
  4. @RequestMapping("/mq")
  5. @ResponseBody
  6. public void mq() {
  7.   mqSender.send("Hello");
  8. }

7、结果

 二、RabbitMQ交换机

Fanout模式

不处理路由键,只需要简单的将队里绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
Fanout 交换机转发消息是最快的

1、RabbitMQConfig.java

  1. package com.xxxx.seckill.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @author zhoubin
  10. * @since 1.0.0
  11. */
  12. @Configuration
  13. public class RabbitMQConfig {
  14.   private static final String QUEUE01 = "queue_fanout01";
  15.   private static final String QUEUE02 = "queue_fanout02";
  16.   private static final String EXCHANGE = "fanoutExchange";
  17.   @Bean
  18.   public Queue queue01(){
  19.      return new Queue(QUEUE01);
  20.   }
  21.   @Bean
  22.   public Queue queue02(){
  23.      return new Queue(QUEUE02);
  24.   }
  25.   @Bean
  26.   public FanoutExchange fanoutExchange(){
  27.      return new FanoutExchange(EXCHANGE);
  28.   }
  29.   @Bean
  30.   public Binding binding01(){
  31.      return BindingBuilder.bind(queue01()).to(fanoutExchange());
  32.   }
  33.   @Bean
  34.   public Binding binding02(){
  35.      return BindingBuilder.bind(queue02()).to(fanoutExchange());
  36.   }
  37. }

2、MQSender.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @author zhoubin
  8. * @since 1.0.0
  9. */
  10. @Service
  11. @Slf4j
  12. public class MQSender {
  13.   @Autowired
  14.   private RabbitTemplate rabbitTemplate;
  15.   public void send(Object msg) {
  16.      log.info("发送消息:"+msg);
  17.      rabbitTemplate.convertAndSend("fanoutExchange","",msg);
  18.   }
  19. }

3、MQReceiver.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @author zhoubin
  7. * @since 1.0.0
  8. */
  9. @Service
  10. @Slf4j
  11. public class MQReceiver {
  12.   @RabbitListener(queues = "queue_fanout01")
  13.   public void receive01(Object msg) {
  14.      log.info("QUEUE01接受消息:" + msg);
  15.   }
  16.   @RabbitListener(queues = "queue_fanout02")
  17.   public void receive02(Object msg) {
  18.      log.info("QUEUE02接受消息:" + msg);
  19.   }
  20. }

4、UserController.java

  1. /**
  2. * 测试发送RabbitMQ消息
  3. */
  4. @RequestMapping("/mq/fanout")
  5. @ResponseBody
  6. public void mq() {
  7.   mqSender.send("Hello");
  8. }

5、测试

调用 mq/direct01 接口,消息经由交换机转发到绑定该交换机的所有队列

Direct模式

所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue
注意: Direct 模式可以使用 RabbitMQ 自带的 Exchange default Exchange, 所以不需要将
Exchange 进行任何绑定 (binding) 操作,消息传递时, RouteKey 必须完全匹配才会被队列接收,否
则该消息会被抛弃。
重点: routing key 与队列 queues key 保持一致,即可以路由到对应的 queue 中。

1、RabbitMQConfig.java

  1. package com.xxxx.seckill.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @author zhoubin
  10. * @since 1.0.0
  11. */
  12. @Configuration
  13. public class RabbitMQConfig {
  14. private static final String QUEUE01 = "queue_direct01";
  15. private static final String QUEUE02 = "queue_direct02";
  16. private static final String EXCHANGE = "directExchange";
  17. private static final String ROUTINGKEY01 = "queue.red";
  18. private static final String ROUTINGKEY02 = "queue.green";
  19. @Bean
  20. public Queue queue01(){
  21. return new Queue(QUEUE01);
  22. }
  23. @Bean
  24. public Queue queue02(){
  25. return new Queue(QUEUE02);
  26. }
  27. @Bean
  28. public DirectExchange directExchange(){
  29. return new DirectExchange(EXCHANGE);
  30. }
  31. @Bean
  32. public Binding binding01(){
  33. return
  34. BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01);
  35. }
  36. @Bean
  37. public Binding binding02(){
  38. return
  39. BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02);
  40. }
  41. }

2、MQSender.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @author zhoubin
  8. * @since 1.0.0
  9. */
  10. @Service
  11. @Slf4j
  12. public class MQSender {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. public void send01(Object msg) {
  16. log.info("发送red消息:"+msg);
  17. rabbitTemplate.convertAndSend("directExchange","queue.red",msg);
  18. }
  19. public void send02(Object msg) {
  20. log.info("发送green消息:"+msg);
  21. rabbitTemplate.convertAndSend("directExchange","queue.green",msg);
  22. }
  23. }

3、MQReceiver.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @author zhoubin
  7. * @since 1.0.0
  8. */
  9. @Service
  10. @Slf4j
  11. public class MQReceiver {
  12. @RabbitListener(queues = "queue_direct01")
  13. public void receive01(Object msg) {
  14. log.info("QUEUE01接受消息:" + msg);
  15. }
  16. @RabbitListener(queues = "queue_direct02")
  17. public void receive02(Object msg) {
  18. log.info("QUEUE02接受消息:" + msg);
  19. }
  20. }

4、UserController.java

  1. /**
  2. * 测试发送RabbitMQ消息
  3. */
  4. @RequestMapping("/mq/direct01")
  5. @ResponseBody
  6. public void mq01() {
  7.   mqSender.send01("Hello,Red");
  8. }
  9. /**
  10. * 测试发送RabbitMQ消息
  11. */
  12. @RequestMapping("/mq/direct02")
  13. @ResponseBody
  14. public void mq02() {
  15.   mqSender.send02("Hello,Green");
  16. }

5、测试

调用 mq/direct01 接口,消息经由交换机绑定的 queue.red RoutingKey 转发到 queue_direct01

调用 mq/direct02 接口,消息经由交换机绑定的 queue.green RoutingKey 转发到 queue_direct02
队列

Topic模式

所有发送到 Topic Exchange 的消息被转发到所有管线 RouteKey 中指定 Topic Queue
Exchange RouteKey 和某 Topic 进行模糊匹配 , 此时队列需要绑定一个 Topic
对于 routing key 匹配模式定义规则举例如下 :
routing key 为一个句点号 . 分隔的字符串(我们将被句点号 . 分隔开的每一段独立的字符串称为
一个单词),如 “stock.usd.nyse” “nyse.vmw” “quick.orange.rabbit”
routing key 中可以存在两种特殊字符 * # ,用于做模糊匹配,其中 * 用于匹配一个单词, #
于匹配多个单词(可以是零个)

1、RabbitMQConfig.java

  1. package com.xxxx.seckill.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @author zhoubin
  10. * @since 1.0.0
  11. */
  12. @Configuration
  13. public class RabbitMQConfig {
  14. private static final String QUEUE01 = "queue_topic01";
  15. private static final String QUEUE02 = "queue_topic02";
  16. private static final String EXCHANGE = "topicExchange";
  17. private static final String ROUTINGKEY01 = "#.queue.#";
  18. private static final String ROUTINGKEY02 = "*.queue.#";
  19. @Bean
  20. public Queue queue01(){
  21. return new Queue(QUEUE01);
  22. }
  23. @Bean
  24. public Queue queue02(){
  25. return new Queue(QUEUE02);
  26. }
  27. @Bean
  28. public TopicExchange topicExchange(){
  29. return new TopicExchange(EXCHANGE);
  30. }
  31. @Bean
  32. public Binding binding01(){
  33. return
  34. BindingBuilder.bind(queue01()).to(topicExchange()).with(ROUTINGKEY01);
  35. }
  36. @Bean
  37. public Binding binding02(){
  38. return
  39. BindingBuilder.bind(queue02()).to(topicExchange()).with(ROUTINGKEY02);
  40. }
  41. }

2、MQSender.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @author zhoubin
  8. * @since 1.0.0
  9. */
  10. @Service
  11. @Slf4j
  12. public class MQSender {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. public void send01(Object msg) {
  16. log.info("发送消息(被01队列接受):"+msg);
  17. rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
  18. }
  19. public void send02(Object msg) {
  20. log.info("发送消息(被两个queue接受):"+msg);
  21. rabbitTemplate.convertAndSend("topicExchange","message.queue.green.abc",msg);
  22. }
  23. }

3、MQReceiver.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @author zhoubin
  7. * @since 1.0.0
  8. */
  9. @Service
  10. @Slf4j
  11. public class MQReceiver {
  12. @RabbitListener(queues = "queue_topic01")
  13. public void receive01(Object msg) {
  14. log.info("QUEUE01接受消息:" + msg);
  15. }
  16. @RabbitListener(queues = "queue_topic02")
  17. public void receive02(Object msg) {
  18. log.info("QUEUE02接受消息:" + msg);
  19. }
  20. }

4、UserController.java

  1. /**
  2. * 测试发送RabbitMQ消息
  3. */
  4. @RequestMapping("/mq/topic01")
  5. @ResponseBody
  6. public void mq01() {
  7.   mqSender.send01("Hello,Red");
  8. }
  9. /**
  10. * 测试发送RabbitMQ消息
  11. */
  12. @RequestMapping("/mq/topic02")
  13. @ResponseBody
  14. public void mq02() {
  15.   mqSender.send02("Hello,Green");
  16. }

5、测试

调用 mq/topic01 接口,消息经由交换机绑定的 #.queue.# RoutingKey 转发到 queue_topic01 队列

调用 mq/topic02 接口,消息经由交换机绑定的 *.queue.# #.queue.# RoutingKey 转发到
queue_topic01 queue_topic02 队列

Headers模式

不依赖 routingkey ,使用发送消息时 basicProperties 对象中的 headers 匹配队列
headers 是一个键值对类型,键值对的值可以是任何类型
在队列绑定交换机时用 x-match 来指定, all 代表定义的多个键值对都要满足, any 则代表只要满足
一个可以了

1、RabbitMQConfig.java

  1. package com.xxxx.seckill.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.HeadersExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @author zhoubin
  12. * @since 1.0.0
  13. */
  14. @Configuration
  15. public class RabbitMQConfig {
  16. private static final String QUEUE01 = "queue_header01";
  17. private static final String QUEUE02 = "queue_header02";
  18. private static final String EXCHANGE = "headersExchange";
  19. @Bean
  20. public Queue queue01(){
  21. return new Queue(QUEUE01);
  22. }
  23. @Bean
  24. public Queue queue02(){
  25. return new Queue(QUEUE02);
  26. }
  27. @Bean
  28. public HeadersExchange headersExchange(){
  29. return new HeadersExchange(EXCHANGE);
  30. }
  31. @Bean
  32. public Binding binding01(){
  33. Map<String,Object> map = new HashMap<>();
  34. map.put("color","red");
  35. map.put("speed","low");
  36. return
  37. BindingBuilder.bind(queue01()).to(headersExchange()).whereAny(map).match();
  38. }
  39. @Bean
  40. public Binding binding02(){
  41. Map<String,Object> map = new HashMap<>();
  42. map.put("color","red");
  43. map.put("speed","fast");
  44. return
  45. BindingBuilder.bind(queue02()).to(headersExchange()).whereAll(map).match();
  46. }
  47. }

2、MQSender.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.core.MessageProperties;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @author zhoubin
  10. * @since 1.0.0
  11. */
  12. @Service
  13. @Slf4j
  14. public class MQSender {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. public void send01(String msg) {
  18. log.info("发送消息(被两个queue接受):" + msg);
  19. MessageProperties properties = new MessageProperties();
  20. properties.setHeader("color", "red");
  21. properties.setHeader("speed", "fast");
  22. Message message = new Message(msg.getBytes(), properties);
  23. rabbitTemplate.convertAndSend("headersExchange", "", message);
  24. }
  25. public void send02(String msg) {
  26. log.info("发送消息(被01队列接受):" + msg);
  27. MessageProperties properties = new MessageProperties();
  28. properties.setHeader("color", "red");
  29. properties.setHeader("speed", "normal");
  30. Message message = new Message(msg.getBytes(), properties);
  31. rabbitTemplate.convertAndSend("headersExchange", "", message);
  32. }
  33. }

3、MQReceiver.java

  1. package com.xxxx.seckill.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @author zhoubin
  8. * @since 1.0.0
  9. */
  10. @Service
  11. @Slf4j
  12. public class MQReceiver {
  13. @RabbitListener(queues = "queue_header01")
  14. public void receive01(Message message) {
  15. log.info("QUEUE01接受Message对象:" + message);
  16. log.info("QUEUE01接受消息:" + new String(message.getBody()));
  17. }
  18. @RabbitListener(queues = "queue_header02")
  19. public void receive02(Message message) {
  20. log.info("QUEUE02接受Message对象:" + message);
  21. log.info("QUEUE02接受消息:" + new String(message.getBody()));
  22. }
  23. }

4、UserController.java

  1. /**
  2. * 测试发送RabbitMQ消息
  3. */
  4. @RequestMapping("/mq/header01")
  5. @ResponseBody
  6. public void mq01() {
  7.   mqSender.send01("Hello,header01");
  8. }
  9. /**
  10. * 测试发送RabbitMQ消息
  11. */
  12. @RequestMapping("/mq/header02")
  13. @ResponseBody
  14. public void mq02() {
  15.   mqSender.send02("Hello,header02");
  16. }

5、测试

queue_header01 设置 x-match any queue_header02 设置 x-match all 。因此调用 mq/header01
接口,可以匹配两个队列

调用 mq/header02 接口,只能匹配 queue_header01 队列

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/639603
推荐阅读
相关标签
  

闽ICP备14008679号