当前位置:   article > 正文

Springboot 整合RabbitMq_springboot整合rabbitmq

springboot整合rabbitmq

该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用,消息回调、手动确认等。 (但是关于rabbitMq的安装,就不介绍了)

在安装完rabbitMq后,输入http://ip:15672/ ,是可以看到一个简单后台管理界面的。

在这个界面里面我们可以做些什么?
可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。

以上这些管理界面的操作在这篇暂时不做扩展描述,我想着重介绍后面实例里会使用到的。

首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:

黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:

Direct Exchange 

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:

①*(星号)仅代表一个单词
②#(井号)代表任意个单词

上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名!

  1. 生产者:发送消息的程序
  2. 消费者:监听接收消费消息的程序
  3. 消息:一串二进制数据流
  4. 队列:消息的暂存区/存储区
  5. 交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种
  6. 路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列

本次实例教程需要创建2个springboot项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。

1、我们先使用下direct exchange(直连型交换机)

创建生产者 rabbitmq-provider

  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  1. # Tomcat
  2. server:
  3. tomcat:
  4. uri-encoding: UTF-8
  5. max-threads: 1000
  6. min-spare-threads: 30
  7. port: 8016
  8. servlet:
  9. context-path: /provider
  10. spring:
  11. #配置rabbitMq 服务器
  12. rabbitmq:
  13. host: 127.0.0.1
  14. port: 5672
  15. username: guest
  16. password: guest

创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):

  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //队列 起名:TestDirectQueue
  4. @Bean
  5. public Queue TestDirectQueue() {
  6. return new Queue("TestDirectQueue",true);
  7. }
  8. //Direct交换机 起名:TestDirectExchange
  9. @Bean
  10. DirectExchange TestDirectExchange() {
  11. return new DirectExchange("TestDirectExchange");
  12. }
  13. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  14. @Bean
  15. Binding bindingDirect() {
  16. return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  17. }
  18. }

然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:

  1. @RestController
  2. @RequestMapping("/producer")
  3. @Api(tags = "生产者模块")
  4. public class ProducerController {
  5. @Resource
  6. RabbitTemplate rabbitTemplate;
  7. @GetMapping("/sendDirectMessage")
  8. @ApiOperation(value = "sendDirectMessage")
  9. @ApiOperationSupport(order = 1)
  10. public String sendDirectMessage(@RequestParam String msg){
  11. rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",msg);
  12. return "ok";
  13. }
  14. }

把rabbitmq-provider项目运行,调用下接口:

因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们去rabbitMq管理页面看看,是否推送成功:

再看看队列(界面上的各个英文项代表什么意思,可以自己查查哈,对理解还是有帮助的):

很好,消息已经推送到rabbitMq服务器上面了。

创建消费者

接下来,创建rabbitmq-consumer项目:

pom.xml里的jar依赖:

  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter</artifactId>
  9. </dependency>

然后是 application.yml:

  1. # Tomcat
  2. server:
  3. tomcat:
  4. uri-encoding: UTF-8
  5. max-threads: 1000
  6. min-spare-threads: 30
  7. port: 8015
  8. servlet:
  9. context-path: /customer
  10. spring:
  11. #配置rabbitMq 服务器
  12. rabbitmq:
  13. host: 127.0.0.1
  14. port: 5672
  15. username: guest
  16. password: guest

然后一样,创建DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,其实消费者也是生成者的身份,也能推送该消息。):

  1. @Configuration
  2. public class DirectRabbitConfig {
  3. //队列 起名:TestDirectQueue
  4. @Bean
  5. public Queue TestDirectQueue() {
  6. return new Queue("TestDirectQueue",true);
  7. }
  8. //Direct交换机 起名:TestDirectExchange
  9. @Bean
  10. DirectExchange TestDirectExchange() {
  11. return new DirectExchange("TestDirectExchange");
  12. }
  13. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  14. @Bean
  15. Binding bindingDirect() {
  16. return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
  17. }
  18. }

然后是创建消息接收监听类,DirectReceiver.java:

  1. /**
  2. * 消费者,用于消费队列信息
  3. */
  4. @Component
  5. @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
  6. public class DirectConsumer {
  7. @RabbitHandler
  8. public void process(String testMessage) {
  9. System.out.println("DirectReceiver消费者收到消息 : " + testMessage);
  10. }
  11. }

然后将rabbitmq-consumer项目运行起来,可以看到把之前推送的那条消息消费下来了:

然后可以再继续调用rabbitmq-provider项目的推送消息接口,可以看到消费者即时消费消息:

那么直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?

可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费。


2、接着,我们使用Topic Exchange 主题交换机

在rabbitmq-provider项目里面创建TopicRabbitConfig.java:

  1. @Configuration
  2. public class TopicRabbitConfig {
  3. @Bean
  4. TopicExchange exchange() {
  5. return new TopicExchange("TestTopicExchange");
  6. }
  7. @Bean
  8. public Queue firstQueue() {
  9. return new Queue("TestTopicQueue1");
  10. }
  11. @Bean
  12. public Queue secondQueue() {
  13. return new Queue("TestTopicQueue2");
  14. }
  15. //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  16. //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  17. @Bean
  18. Binding bindingExchangeMessage() {
  19. return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
  20. }
  21. //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
  22. // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
  23. @Bean
  24. Binding bindingExchangeMessage2() {
  25. return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
  26. }
  27. }

 然后添加多2个接口,用于推送消息到主题交换机:

  1. @GetMapping("/sendTopicMessage1")
  2. @ApiOperation(value = "sendTopicMessage1")
  3. @ApiOperationSupport(order = 2)
  4. public String sendTopicMessage1() {
  5. rabbitTemplate.convertAndSend("TestTopicExchange", "topic.man", "推送消息,路由键为topic.man");
  6. return "ok";
  7. }
  8. @GetMapping("/sendTopicMessage2")
  9. @ApiOperation(value = "sendTopicMessage2")
  10. @ApiOperationSupport(order = 3)
  11. public String sendTopicMessage2() {
  12. rabbitTemplate.convertAndSend("TestTopicExchange", "topic.woman", "推送消息,路由键为topic.woman");
  13. return "ok";
  14. }

生产者这边已经完事,先不急着运行,在rabbitmq-consumer项目上,创建1个消费者

  1. /**
  2. * 消费者,用于消费队列信息
  3. */
  4. @Component
  5. @RabbitListener(queues = "TestTopicQueue1")
  6. public class TopicConsumer1 {
  7. @RabbitHandler
  8. public void process(String testMessage) {
  9. System.out.println("TopicConsumer1消费者收到消息 : " + testMessage);
  10. }
  11. }

再创建一个消费者

  1. /**
  2. * 消费者,用于消费队列信息
  3. */
  4. @Component
  5. @RabbitListener(queues = "TestTopicQueue2")
  6. public class TopicConsumer2 {
  7. @RabbitHandler
  8. public void process(String testMessage) {
  9. System.out.println("TopicConsumer2消费者收到消息 : " + testMessage);
  10. }
  11. }

然后把rabbitmq-provider,rabbitmq-consumer两个项目都跑起来,先调用/sendTopicMessage1 

然后看消费者rabbitmq-consumer的控制台输出情况:

TopicConsumer1  监听队列1,绑定键为:topic.man
TopicConsumer2 监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.man  

所以可以看到两个监听消费者receiver都成功消费到了消息,因为这两个消费者监听的队列的绑定键都能与这条消息携带的路由键匹配上。

TopicConsumer1消费者收到消息  : 推送消息,路由键为topic.man
TopicConsumer2消费者收到消息  : 推送消息,路由键为topic.man

接下来调用接口/sendTopicMessage2:

然后看消费者rabbitmq-consumer的控制台输出情况:
TopicConsumer1   监听队列1,绑定键为:topic.man
TopicConsumer2   监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.woman

所以可以看到两个监听消费者只有TopicConsumer2成功消费到了消息。

TopicConsumer2消费者收到消息  : 推送消息,路由键为topic.woman


3、接下来是使用Fanout Exchang 扇型交换机

同样地,先在rabbitmq-provider项目上创建FanoutRabbitConfig.java:

  1. @Configuration
  2. public class FanoutRabbitConfig {
  3. @Bean
  4. FanoutExchange fanoutExchange() {
  5. return new FanoutExchange("TestFanoutExchange");
  6. }
  7. /**
  8. * 创建三个队列 :fanout.A fanout.B fanout.C
  9. * 将三个队列都绑定在交换机 fanoutExchange 上
  10. * 因为是扇型交换机, 路由键无需配置,配置也不起作用
  11. */
  12. @Bean
  13. public Queue queueA() {
  14. return new Queue("fanout.A");
  15. }
  16. @Bean
  17. public Queue queueB() {
  18. return new Queue("fanout.B");
  19. }
  20. @Bean
  21. public Queue queueC() {
  22. return new Queue("fanout.C");
  23. }
  24. @Bean
  25. Binding bindingExchangeA() {
  26. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  27. }
  28. @Bean
  29. Binding bindingExchangeB() {
  30. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  31. }
  32. @Bean
  33. Binding bindingExchangeC() {
  34. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  35. }
  36. }

然后是写一个接口用于推送消息

  1. @GetMapping("/sendFanoutMessage")
  2. @ApiOperation(value = "sendFanoutMessage")
  3. @ApiOperationSupport(order = 4)
  4. public String sendFanoutMessage() {
  5. rabbitTemplate.convertAndSend("TestFanoutExchange", null, "推送消息");
  6. return "ok";
  7. }

接着在rabbitmq-consumer项目里加上3个消费类

  1. @Component
  2. @RabbitListener(queues = "fanout.A")
  3. public class FanoutConsumerA {
  4. @RabbitHandler
  5. public void process(String testMessage) {
  6. System.out.println("FanoutConsumerA消费者收到消息 : " +testMessage.toString());
  7. }
  8. }
  1. @Component
  2. @RabbitListener(queues = "fanout.B")
  3. public class FanoutConsumerB {
  4. @RabbitHandler
  5. public void process(String testMessage) {
  6. System.out.println("FanoutConsumerB消费者收到消息 : " +testMessage.toString());
  7. }
  8. }
  1. @Component
  2. @RabbitListener(queues = "fanout.C")
  3. public class FanoutConsumerC {
  4. @RabbitHandler
  5. public void process(String testMessage) {
  6. System.out.println("FanoutConsumerC消费者收到消息 : " +testMessage.toString());
  7. }
  8. }

最后将rabbitmq-provider和rabbitmq-consumer项目都跑起来,调用下接口/sendFanoutMessage 

然后看看rabbitmq-consumer项目的控制台情况:

FanoutConsumerB消费者收到消息  : 推送消息
FanoutConsumerA消费者收到消息  : 推送消息
FanoutConsumerC消费者收到消息  : 推送消息

可以看到只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/405680
推荐阅读
相关标签
  

闽ICP备14008679号