赞
踩
前言
什么是RabbitMq
rabbitMq是消息队列的一种;
消息队列中间件是大型系统中的重要组件,它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。
目前常见的消息中间件有ActiveMQ、RabbitMQ、ZeroMQ等。
我也想写的详细,写得多,写得好,但是实力不允许
所以请看下面
论综合实力,RabbitMq为最佳。(不要问,公司在用这个,强行解释一波)
首先,我们要安装rabbitMq
rabbitMq 基本使用
简单的一个消息推送到接收的流程图
名词解释
Queue: 到这,大家基本都知道RabbitMQ就是消息队列的一种实现,那么围绕这个,我们就可以思考一个消息队列到底需要什么,当然是需要队列,那么这个队列就是Queue。Queue是RabbitMQ的内部对象,用于存储消息。可以指定name来唯一确定。
Exchange:交换机,接收消息,并根据路由键转发消息到绑定的队列,即通过binding-key 与 routing-key 的匹配关系来决定将消息分发到指定queue
RoutingKey:路由键,将消息路由到指定的队列,Exchange和Queue建立绑定关系的key,根据路由键,消息到交换机的时候
以上都是基本的使用,创建的时候都是用生产者去创建queue的,但是我们一般在正式的使用场景中,用的最多(我用的最多)的还是消费者自动创建交换机,路由key 及 queue。
在开始项目之前,我们需要做三件事:
1.添加依赖
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2. 了解两个注解:@RabbitHandler @RabbitListener()
@RabbitHandler:这个注解没什么意义,仅仅作为标识处理注解。可加可不加,建议是加上,不要问为什么,问也不知道,直接加上,不会错;
@RabbitHandler注解没找到解释,上面解释仅根据某翻译提供的内容个人理解的!
@RabbitListener():这个表示监听指定的queue,也可以自动创建并监听queue。 划重点!
Spring-rabbit之@RabbitListener解析
有2种方式配置RabbitListener
一种使用RabbitListener标注类且使用RabbitHandler标注方法,
另一种使用RabbitListener标注方法,
两种模式同时存在也可以。
使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法
3. 消息发送和接受确认(ACK):
默认情况下如果一个 消息 被消费者所正确接收则会被从 Queue 中移除,这种行为就叫做消息确认(ACK)
ACK分为自动ACK和手动ACK
自动ACK:消费者一旦接收消息,自动ACK
手动ACK:消息接收后,不会ACK,需调用 basicAck 方法
敲黑板,划重点:ack最好手动,自动ACK接收消息后就自动ACK,消息易丢失,万一这时候消费者服务挂掉,消息直接丢!!
当然,具体还是要看业务,一般来说,自动ACK比手动ACK要效率要高。
但是我们在实际开发中,ACK的时候没有上述资料说的那么复杂,
直接手动调用basicAck方法即可 : channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
4.1 消费者(consumer-demo.yml) 配置相关参数:
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- # 访问帐号密码不可使用guest 需要新建
- username: text
- password: text
- #虚拟主机,可自定义
- virtual-host: /
- #生产者与broker之间的消息确认称为public confirms,
- #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
- #发送确认
- publisher-confirms: true
- #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题。
- listener:
- # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- direct:
- acknowledge-mode: manual
- #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- simple:
- acknowledge-mode: manual
- #数字是几就是几个线程监听队列
- concurrency: 3
-
- #交换机配置
- rabbit:
- queue:
- name: text
4.2 生产者(product-demo.yml) 配置相关参数:
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- # 访问帐号密码不可使用guest 需要新建
- username: ets
- password: ets@123
- #虚拟主机
- virtual-host: /
- #生产者与broker之间的消息确认称为public confirms,
- #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
- #发送确认
- publisher-confirms: true
- #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁
- publisher-returns: true
- #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题
- listener:
- # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- direct:
- acknowledge-mode: manual
- #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- simple:
- acknowledge-mode: manual
- #数字是几就是几个线程监听队列
- concurrency: 3
配置文件注释有点详细,不要在意这些细节!不详细看不懂啊!
消费者和生产者的配置基本差不多。
一个生产者对应多个消费者(我用的toptic模式,看具体业务 : direct,直连,1:1;toptic,主题,1:n;fanout,广播,n:n),所以,生产者这边的配置文件不能配置自定义Exchange名称,需动态获取。当然了,direct模式下,自定义和动态创建都是可以的!
消费者这边的接收确认根据业务需求进行手动ack,所以 publisher-returns(接收消息确认,默认false) 默认就行,生产者这边则需要配置publisher-returns 为true ,自动确认。
以上为个人理解。有误请指出,谢谢!
------------------基本知识聊完,看下面,好戏开场了---------------------
打开我们项目,就前三集那个项目:
在consumer-demo 项目新建以下目录(在任意一个类都行,这是把rabbitMq的放在一起):
新建个方法,开始写:
- @RabbitHandler
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),
- value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))
- public void textRabbit(Channel channel,Message message){
-
- }
这里解释下各个名词:
bindings:绑定,自动声明。不能和queue同时指定,否则报错
@QueueBinding : queue绑定
exchange : 交换机指定
@Exchange: 中的value是交换机名称,type为交换机类型
@Queue: 中的value是queue名称,durable :持久化
key:Routingkey。
(个人猜测理解,有误请指出, 感谢!)
Message: 看名字就知道,消息内容。直接 message.getBody()就可以取到消息。
Channel: 信道,TCP虚拟连接, AMQP的命令都是通过信道发送出去的,每条信道都会被指派一个唯一ID。一个TCP连接,对应多个信道,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。但是一个生产者或一个消费者与MQ服务器之间只有一条TCP连接
完整方法:
- @RabbitHandler
- @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),
- value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))
- public void textRabbit(Channel channel,Message message){
- try {
- //msg就是队列内容,需自己处理
- String msg=new String(message.getBody(),"UTF-8");
- System.out.println(msg);
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- try {
- //ack
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
注意:
1.创建queue和key唯一,相同会直接报错!
2.别忘了ACK!!原因上面有!下面也有!
一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
--------到这基本consumer的代码就结束了----------
下面来看product-demo:
首先和consumer-demo一样,创建一个单独的类
然后注入 RabbitTemplate,这个直接注入就行
- @Autowired
- RabbitTemplate rabbitTemplate;
在源码中,RabbitTemplate实现RabbitOperations,RabbitOperations实现了AmqpTemplate
我们直接调用convertAndSend方法来发送消息 ,convertAndSend能获取请求消息队列所返回的参数(反正根据需求来)
参数格式为:convertAndSend(Exchange,RouteKey,Message,ID)
Exchange,RouteKey都是消费者自动创建的,所以在生产者这里是动态的!消费者会根据不同业务(不同方法)创建Exchange,RouteKey,Queue,生产者这边根据Exchange,RouteKey就可以找到指定的Queue,把消息存在queue中
ID为消息唯一ID
以上根据需求自行选择参数!非必填!
这里由于是测试的,就用固定值了,想试试动态的话,自己封装公共方法自定义参数即可:
- @Component
- public class ProductRabbit {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- public String sendAndReceive() {
- rabbitTemplate.setReplyTimeout(1000 * 10);
-
- CorrelationData correlationData = new CorrelationData();
- //消息唯一ID
- correlationData.setId(UUID.randomUUID().toString());
- Map<String,Object> map=new HashMap<>();
- map.put("name","测试");
- map.put("sex",1);
- //sendMsg,需自己处理
- String sendMsg = JSON.toJSONString(JSON.toJSONString(map));
- try {
- Object replyMessage = rabbitTemplate.convertSendAndReceive("text-exchange", "text-queue", sendMsg, correlationData);
- if (replyMessage == null) {
- //可自行处理
- return "返回为null";
- } else {
- return replyMessage.toString();
- }
- } catch (Exception ex) {
- //自行处理异常
- return "系统异常";
- }
- }
- }
我们只需在另外的方法中调用这个即可!
附配置文件完整版(nacos上的配置文件):
consumer-application.yaml
- spring:
- application:
- name: consumer-application
-
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- # 访问帐号密码不可使用guest 需要新建
- username: text
- password: text
- #虚拟主机
- virtual-host: /
- #生产者与broker之间的消息确认称为public confirms,
- #public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。
- #发送确认
- publisher-confirms: true
- #接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁
- publisher-returns: false
- #如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题
- listener:
- # 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- direct:
- acknowledge-mode: manual
- #消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认
- simple:
- acknowledge-mode: manual
- #数字是几就是几个线程监听队列
- concurrency: 3
-
- #交换机配置
- rabbit:
- queue:
- name: text
-
- server:
- port: 8096
全部配置结束,我们先启动consumer-demo, 启动后会自动创建一个queue!
点进去
我们关闭consumer-demo,打开product-demo,访问,返回OK!(不关闭consumer-demo的话,消息会直接被ACK!)
打开刚才的rabbitMq管理页面,会看到消息收到了!
三个单词分别表示:
Ready:待消费的消息总数。
Unacked:待应答的消息总数。
Total:总数 Ready+Unacked。
在页面找到 get message 点击 : 会看到发送的内容
我们启动consumer-demo
打开刚才的rabbitMq管理页面,会发现消息没了,而我们消费者已经拿到了数据,消息确认了,队列的消息就木了。
如果我们把ack去掉呢?
请看:
如果不进行ack的话,可能无论多少次,获取到的消息都不会消失(当然,rabbitMq有机制,到一定次数就不会推送了!!),这种情况在项目中可能会存在重复数据的可能性,根据需求进行选择。
----------------------------------------THE END --------------------------------
纯学习笔记,有误指出,谢谢!
昨天用家里电脑装rabbitMq翻车了
用公司电脑的环境弄的。晚上回去再试试。。。。。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。