当前位置:   article > 正文

SpringCloud学习之路(四): SpringCloud 整合 RabbitMq(一)----RabbitMq基本原理及自动创建并监听队列_rabbitmq集成springcloud

rabbitmq集成springcloud

前言

什么是RabbitMq

rabbitMq是消息队列的一种;

那么问题来了,什么是消息队列呢?戳这里

消息队列中间件是大型系统中的重要组件,它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。
目前常见的消息中间件有ActiveMQ、RabbitMQ、ZeroMQ等。

我也想写的详细,写得多,写得好,但是实力不允许

 

所以请看下面

ActiveMQ、RabbitMQ、ZeroMQ三者比较

论综合实力,RabbitMq为最佳。(不要问,公司在用这个,强行解释一波)

首先,我们要安装rabbitMq

RabbitMq的安装与下载

 

rabbitMq 基本使用

 

rabbitMq 交换机类型及基本使用

 

简单的一个消息推送到接收的流程图

名词解释

Queue: 到这,大家基本都知道RabbitMQ就是消息队列的一种实现,那么围绕这个,我们就可以思考一个消息队列到底需要什么,当然是需要队列,那么这个队列就是Queue。Queue是RabbitMQ的内部对象,用于存储消息。可以指定name来唯一确定。

Exchange:交换机,接收消息,并根据路由键转发消息到绑定的队列,即通过binding-key 与 routing-key 的匹配关系来决定将消息分发到指定queue

RoutingKey:路由键,将消息路由到指定的队列,Exchange和Queue建立绑定关系的key,根据路由键,消息到交换机的时候

 

以上都是基本的使用,创建的时候都是用生产者去创建queue的,但是我们一般在正式的使用场景中,用的最多(我用的最多)的还是消费者自动创建交换机,路由key 及 queue。

 

在开始项目之前,我们需要做三件事:

1.添加依赖

 

  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

 

2. 了解两个注解:@RabbitHandler @RabbitListener()

@RabbitHandler:这个注解没什么意义,仅仅作为标识处理注解。可加可不加,建议是加上,不要问为什么,问也不知道,直接加上,不会错

@RabbitHandler注解没找到解释,上面解释仅根据某翻译提供的内容个人理解的!

 

@RabbitListener():这个表示监听指定的queue,也可以自动创建并监听queue。 划重点!

 

Spring-rabbit之@RabbitListener解析

有2种方式配置RabbitListener

一种使用RabbitListener标注类且使用RabbitHandler标注方法,

另一种使用RabbitListener标注方法,

两种模式同时存在也可以。

使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法

 

 

3. 消息发送和接受确认(ACK):

默认情况下如果一个 消息 被消费者所正确接收则会被从 Queue 中移除,这种行为就叫做消息确认(ACK)

ACK分为自动ACK和手动ACK

自动ACK:消费者一旦接收消息,自动ACK

手动ACK:消息接收后,不会ACK,需调用 basicAck 方法

 

敲黑板,划重点:ack最好手动,自动ACK接收消息后就自动ACK,消息易丢失,万一这时候消费者服务挂掉,消息直接丢!!

当然,具体还是要看业务,一般来说,自动ACK比手动ACK要效率要高。

ACK解析

 

但是我们在实际开发中,ACK的时候没有上述资料说的那么复杂,

直接手动调用basicAck方法即可 :  channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

4.1  消费者(consumer-demo.yml) 配置相关参数:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. # 访问帐号密码不可使用guest 需要新建
  6. username: text
  7. password: text
  8. #虚拟主机,可自定义
  9. virtual-host: /
  10. #生产者与broker之间的消息确认称为public confirms,
  11. #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
  12. #发送确认
  13. publisher-confirms: true
  14. #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题。
  15. listener:
  16. # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  17. direct:
  18. acknowledge-mode: manual
  19. #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  20. simple:
  21. acknowledge-mode: manual
  22. #数字是几就是几个线程监听队列
  23. concurrency: 3
  24. #交换机配置
  25. rabbit:
  26. queue:
  27. name: text

 

4.2  生产者(product-demo.yml) 配置相关参数:

 

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. # 访问帐号密码不可使用guest 需要新建
  6. username: ets
  7. password: ets@123
  8. #虚拟主机
  9. virtual-host: /
  10. #生产者与broker之间的消息确认称为public confirms,
  11. #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
  12. #发送确认
  13. publisher-confirms: true
  14. #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁
  15. publisher-returns: true
  16. #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题
  17. listener:
  18. # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  19. direct:
  20. acknowledge-mode: manual
  21. #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  22. simple:
  23. acknowledge-mode: manual
  24. #数字是几就是几个线程监听队列
  25. concurrency: 3

 

配置文件注释有点详细,不要在意这些细节!不详细看不懂啊!

 

 

 

 

消费者和生产者的配置基本差不多。

一个生产者对应多个消费者(我用的toptic模式,看具体业务 : direct,直连,1:1;toptic,主题,1:n;fanout,广播,n:n),所以,生产者这边的配置文件不能配置自定义Exchange名称,需动态获取。当然了,direct模式下,自定义和动态创建都是可以的!

消费者这边的接收确认根据业务需求进行手动ack,所以  publisher-returns(接收消息确认,默认false) 默认就行,生产者这边则需要配置publisher-returns 为true ,自动确认。

以上为个人理解。有误请指出,谢谢!

 

 

 

 

------------------基本知识聊完,看下面,好戏开场了---------------------

RabbitMq四中交换机

 

打开我们项目,就前三集那个项目:

 

在consumer-demo 项目新建以下目录(在任意一个类都行,这是把rabbitMq的放在一起):

 

新建个方法,开始写:

 

  1. @RabbitHandler
  2. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),
  3. value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))
  4. public void textRabbit(Channel channel,Message message){
  5. }

 

这里解释下各个名词:

 bindings:绑定,自动声明。不能和queue同时指定,否则报错

 @QueueBinding  : queue绑定

 exchange : 交换机指定

 @Exchange: 中的value是交换机名称,type为交换机类型

 @Queue:   中的value是queue名称,durable :持久化

 key:Routingkey。

 

(个人猜测理解,有误请指出, 感谢!)

Message: 看名字就知道,消息内容。直接   message.getBody()就可以取到消息。

Channel:  信道,TCP虚拟连接, AMQP的命令都是通过信道发送出去的,每条信道都会被指派一个唯一ID。一个TCP连接,对应多个信道,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。但是一个生产者或一个消费者与MQ服务器之间只有一条TCP连接

 

完整方法:

  1. @RabbitHandler
  2. @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),
  3. value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))
  4. public void textRabbit(Channel channel,Message message){
  5. try {
  6. //msg就是队列内容,需自己处理
  7. String msg=new String(message.getBody(),"UTF-8");
  8. System.out.println(msg);
  9. } catch (Exception e) {
  10. e.printStackTrace();
  11. }finally {
  12. try {
  13. //ack
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

注意:

1.创建queue和key唯一,相同会直接报错!

2.别忘了ACK!!原因上面有!下面也有!

RabbitMq的Ack机制
 

一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);


 

 

--------到这基本consumer的代码就结束了----------

 

下面来看product-demo:

首先和consumer-demo一样,创建一个单独的类

 

然后注入 RabbitTemplate,这个直接注入就行

 

  1. @Autowired
  2. RabbitTemplate rabbitTemplate;

 

在源码中,RabbitTemplate实现RabbitOperations,RabbitOperations实现了AmqpTemplate

 

 

 

 

我们直接调用convertAndSend方法来发送消息 ,convertAndSend能获取请求消息队列所返回的参数(反正根据需求来)

参数格式为:convertAndSend(Exchange,RouteKey,Message,ID)

Exchange,RouteKey都是消费者自动创建的,所以在生产者这里是动态的!消费者会根据不同业务(不同方法)创建Exchange,RouteKey,Queue,生产者这边根据Exchange,RouteKey就可以找到指定的Queue,把消息存在queue中

ID为消息唯一ID

以上根据需求自行选择参数!非必填!

这里由于是测试的,就用固定值了,想试试动态的话,自己封装公共方法自定义参数即可:

 

  1. @Component
  2. public class ProductRabbit {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. public String sendAndReceive() {
  6. rabbitTemplate.setReplyTimeout(1000 * 10);
  7. CorrelationData correlationData = new CorrelationData();
  8. //消息唯一ID
  9. correlationData.setId(UUID.randomUUID().toString());
  10. Map<String,Object> map=new HashMap<>();
  11. map.put("name","测试");
  12. map.put("sex",1);
  13. //sendMsg,需自己处理
  14. String sendMsg = JSON.toJSONString(JSON.toJSONString(map));
  15. try {
  16. Object replyMessage = rabbitTemplate.convertSendAndReceive("text-exchange", "text-queue", sendMsg, correlationData);
  17. if (replyMessage == null) {
  18. //可自行处理
  19. return "返回为null";
  20. } else {
  21. return replyMessage.toString();
  22. }
  23. } catch (Exception ex) {
  24. //自行处理异常
  25. return "系统异常";
  26. }
  27. }
  28. }

 

我们只需在另外的方法中调用这个即可!

 

 

 

附配置文件完整版(nacos上的配置文件):

consumer-application.yaml

 

  1. spring:
  2. application:
  3. name: consumer-application
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. # 访问帐号密码不可使用guest 需要新建
  8. username: text
  9. password: text
  10. #虚拟主机
  11. virtual-host: /
  12. #生产者与broker之间的消息确认称为public confirms,
  13. #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
  14. #发送确认
  15. publisher-confirms: true
  16. #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁
  17. publisher-returns: false
  18. #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题
  19. listener:
  20. # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  21. direct:
  22. acknowledge-mode: manual
  23. #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
  24. simple:
  25. acknowledge-mode: manual
  26. #数字是几就是几个线程监听队列
  27. concurrency: 3
  28. #交换机配置
  29. rabbit:
  30. queue:
  31. name: text
  32. server:
  33. port: 8096

 

全部配置结束,我们先启动consumer-demo, 启动后会自动创建一个queue!

 

点进去

 

 

 

我们关闭consumer-demo,打开product-demo,访问,返回OK!(不关闭consumer-demo的话,消息会直接被ACK!)

 

打开刚才的rabbitMq管理页面,会看到消息收到了!

三个单词分别表示:

Ready:待消费的消息总数。


Unacked:待应答的消息总数。


Total:总数 Ready+Unacked。

 

 

 

在页面找到 get message 点击 :  会看到发送的内容

 

 

我们启动consumer-demo

 

 

打开刚才的rabbitMq管理页面,会发现消息没了,而我们消费者已经拿到了数据,消息确认了,队列的消息就木了。

 

 

如果我们把ack去掉呢?

请看:

 

 

 

如果不进行ack的话,可能无论多少次,获取到的消息都不会消失(当然,rabbitMq有机制,到一定次数就不会推送了!!),这种情况在项目中可能会存在重复数据的可能性,根据需求进行选择。

 

 

----------------------------------------THE END --------------------------------

 

纯学习笔记,有误指出,谢谢!

 

昨天用家里电脑装rabbitMq翻车了

 

 

用公司电脑的环境弄的。晚上回去再试试。。。。。。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/390911
推荐阅读
相关标签
  

闽ICP备14008679号