赞
踩
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用,消息回调、手动确认等。 (但是关于rabbitMq的安装,就不介绍了)
在安装完rabbitMq后,输入http://ip:15672/ ,是可以看到一个简单后台管理界面的。
在这个界面里面我们可以做些什么?
可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。
以上这些管理界面的操作在这篇暂时不做扩展描述,我想着重介绍后面实例里会使用到的。
首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
①*(星号)仅代表一个单词
②#(井号)代表任意个单词
上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名!
本次实例教程需要创建2个springboot项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- # Tomcat
- server:
- tomcat:
- uri-encoding: UTF-8
- max-threads: 1000
- min-spare-threads: 30
- port: 8016
- servlet:
- context-path: /provider
-
- spring:
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
-
创建DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
- @Configuration
- public class DirectRabbitConfig {
-
- //队列 起名:TestDirectQueue
- @Bean
- public Queue TestDirectQueue() {
- return new Queue("TestDirectQueue",true);
- }
-
- //Direct交换机 起名:TestDirectExchange
- @Bean
- DirectExchange TestDirectExchange() {
- return new DirectExchange("TestDirectExchange");
- }
-
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
- @Bean
- Binding bindingDirect() {
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
- }
- }
然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
- @RestController
- @RequestMapping("/producer")
- @Api(tags = "生产者模块")
- public class ProducerController {
-
- @Resource
- RabbitTemplate rabbitTemplate;
-
- @GetMapping("/sendDirectMessage")
- @ApiOperation(value = "sendDirectMessage")
- @ApiOperationSupport(order = 1)
- public String sendDirectMessage(@RequestParam String msg){
- rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",msg);
- return "ok";
- }
- }
把rabbitmq-provider项目运行,调用下接口:
因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们去rabbitMq管理页面看看,是否推送成功:
再看看队列(界面上的各个英文项代表什么意思,可以自己查查哈,对理解还是有帮助的):
很好,消息已经推送到rabbitMq服务器上面了。
接下来,创建rabbitmq-consumer项目:
pom.xml里的jar依赖:
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
然后是 application.yml:
- # Tomcat
- server:
- tomcat:
- uri-encoding: UTF-8
- max-threads: 1000
- min-spare-threads: 30
- port: 8015
- servlet:
- context-path: /customer
-
- spring:
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
然后一样,创建DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,其实消费者也是生成者的身份,也能推送该消息。):
- @Configuration
- public class DirectRabbitConfig {
-
- //队列 起名:TestDirectQueue
- @Bean
- public Queue TestDirectQueue() {
- return new Queue("TestDirectQueue",true);
- }
-
- //Direct交换机 起名:TestDirectExchange
- @Bean
- DirectExchange TestDirectExchange() {
- return new DirectExchange("TestDirectExchange");
- }
-
- //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
- @Bean
- Binding bindingDirect() {
- return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
- }
- }
然后是创建消息接收监听类,DirectReceiver.java:
- /**
- * 消费者,用于消费队列信息
- */
- @Component
- @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
- public class DirectConsumer {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("DirectReceiver消费者收到消息 : " + testMessage);
- }
-
- }
然后将rabbitmq-consumer项目运行起来,可以看到把之前推送的那条消息消费下来了:
然后可以再继续调用rabbitmq-provider项目的推送消息接口,可以看到消费者即时消费消息:
那么直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?
可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费。
在rabbitmq-provider项目里面创建TopicRabbitConfig.java:
- @Configuration
- public class TopicRabbitConfig {
-
- @Bean
- TopicExchange exchange() {
- return new TopicExchange("TestTopicExchange");
- }
-
- @Bean
- public Queue firstQueue() {
- return new Queue("TestTopicQueue1");
- }
-
- @Bean
- public Queue secondQueue() {
- return new Queue("TestTopicQueue2");
- }
-
-
- //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
- //这样只要是消息携带的路由键是topic.man,才会分发到该队列
- @Bean
- Binding bindingExchangeMessage() {
- return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
- }
-
- //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
- // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
- @Bean
- Binding bindingExchangeMessage2() {
- return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
- }
-
- }
然后添加多2个接口,用于推送消息到主题交换机:
- @GetMapping("/sendTopicMessage1")
- @ApiOperation(value = "sendTopicMessage1")
- @ApiOperationSupport(order = 2)
- public String sendTopicMessage1() {
- rabbitTemplate.convertAndSend("TestTopicExchange", "topic.man", "推送消息,路由键为topic.man");
- return "ok";
- }
-
- @GetMapping("/sendTopicMessage2")
- @ApiOperation(value = "sendTopicMessage2")
- @ApiOperationSupport(order = 3)
- public String sendTopicMessage2() {
- rabbitTemplate.convertAndSend("TestTopicExchange", "topic.woman", "推送消息,路由键为topic.woman");
- return "ok";
- }
生产者这边已经完事,先不急着运行,在rabbitmq-consumer项目上,创建1个消费者
- /**
- * 消费者,用于消费队列信息
- */
- @Component
- @RabbitListener(queues = "TestTopicQueue1")
- public class TopicConsumer1 {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("TopicConsumer1消费者收到消息 : " + testMessage);
- }
- }
再创建一个消费者
- /**
- * 消费者,用于消费队列信息
- */
- @Component
- @RabbitListener(queues = "TestTopicQueue2")
- public class TopicConsumer2 {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("TopicConsumer2消费者收到消息 : " + testMessage);
- }
- }
然后把rabbitmq-provider,rabbitmq-consumer两个项目都跑起来,先调用/sendTopicMessage1
然后看消费者rabbitmq-consumer的控制台输出情况:
TopicConsumer1 监听队列1,绑定键为:topic.man
TopicConsumer2 监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.man
所以可以看到两个监听消费者receiver都成功消费到了消息,因为这两个消费者监听的队列的绑定键都能与这条消息携带的路由键匹配上。
TopicConsumer1消费者收到消息 : 推送消息,路由键为topic.man
TopicConsumer2消费者收到消息 : 推送消息,路由键为topic.man
接下来调用接口/sendTopicMessage2:
然后看消费者rabbitmq-consumer的控制台输出情况:
TopicConsumer1 监听队列1,绑定键为:topic.man
TopicConsumer2 监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.woman
所以可以看到两个监听消费者只有TopicConsumer2成功消费到了消息。
TopicConsumer2消费者收到消息 : 推送消息,路由键为topic.woman
同样地,先在rabbitmq-provider项目上创建FanoutRabbitConfig.java:
- @Configuration
- public class FanoutRabbitConfig {
-
- @Bean
- FanoutExchange fanoutExchange() {
- return new FanoutExchange("TestFanoutExchange");
- }
-
- /**
- * 创建三个队列 :fanout.A fanout.B fanout.C
- * 将三个队列都绑定在交换机 fanoutExchange 上
- * 因为是扇型交换机, 路由键无需配置,配置也不起作用
- */
- @Bean
- public Queue queueA() {
- return new Queue("fanout.A");
- }
-
- @Bean
- public Queue queueB() {
- return new Queue("fanout.B");
- }
-
- @Bean
- public Queue queueC() {
- return new Queue("fanout.C");
- }
-
- @Bean
- Binding bindingExchangeA() {
- return BindingBuilder.bind(queueA()).to(fanoutExchange());
- }
-
- @Bean
- Binding bindingExchangeB() {
- return BindingBuilder.bind(queueB()).to(fanoutExchange());
- }
-
- @Bean
- Binding bindingExchangeC() {
- return BindingBuilder.bind(queueC()).to(fanoutExchange());
- }
- }
然后是写一个接口用于推送消息
- @GetMapping("/sendFanoutMessage")
- @ApiOperation(value = "sendFanoutMessage")
- @ApiOperationSupport(order = 4)
- public String sendFanoutMessage() {
- rabbitTemplate.convertAndSend("TestFanoutExchange", null, "推送消息");
- return "ok";
- }
接着在rabbitmq-consumer项目里加上3个消费类
- @Component
- @RabbitListener(queues = "fanout.A")
- public class FanoutConsumerA {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("FanoutConsumerA消费者收到消息 : " +testMessage.toString());
- }
-
- }
- @Component
- @RabbitListener(queues = "fanout.B")
- public class FanoutConsumerB {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("FanoutConsumerB消费者收到消息 : " +testMessage.toString());
- }
-
- }
- @Component
- @RabbitListener(queues = "fanout.C")
- public class FanoutConsumerC {
-
- @RabbitHandler
- public void process(String testMessage) {
- System.out.println("FanoutConsumerC消费者收到消息 : " +testMessage.toString());
- }
-
- }
最后将rabbitmq-provider和rabbitmq-consumer项目都跑起来,调用下接口/sendFanoutMessage
然后看看rabbitmq-consumer项目的控制台情况:
FanoutConsumerB消费者收到消息 : 推送消息
FanoutConsumerA消费者收到消息 : 推送消息
FanoutConsumerC消费者收到消息 : 推送消息
可以看到只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。