赞
踩
在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。
废话不说直接开始撸代码!!!在代码中解决实际问题~
接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:
声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。
- package com.example.rabbitmq_demo.fabuquerengaoji;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @ClaseName: ConfirmConfig$
- * @Description:配置类 发布确认(高级)
- * @Author: wuhs
- * @Date: 2023/8/16$ 14:32$
- * 快捷键ctrl+shift+u 字母大小写转化
- */
- @Configuration
- public class ConfirmConfig {
- // 交换机
- public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
- // 队列
- public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
- // ROUTING-KEY
- public static final String CONFIRM_ROUTING_KEY = "key1";
-
- @Bean
- public DirectExchange confirmExchange() {
- return new DirectExchange(CONFIRM_EXCHANGE_NAME);
- }
-
- @Bean
- public Queue confirmQueue() {
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
-
- @Bean
- public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
- @Qualifier("confirmQueue") Queue queue) {
- //一般使用在项目中使用@Qualifier来限定注入的Bean。
- return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
-
- }
- }
通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!
- package com.example.rabbitmq_demo.fabuquerengaoji;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * @ClaseName: Consumer$
- * @Description:消费者
- * @Author: wuhs
- * @Date: 2023/8/16$ 15:18$】
- */
- @Slf4j
- @Component
- public class Consumer {
- @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
- public void reciverConfirmMessage(Message message) {
- String msg = new String(message.getBody());
- log.info("接收到的队列confirm.queue消息:{}", msg);
- }
- }
在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?
在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:
- package com.example.rabbitmq_demo.fabuquerengaoji;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- /**
- * @ClaseName: MyCallBack$
- * @Description:
- * @Author: wuhs
- * @Date: 2023/8/16$ 16:17$
- */
- @Slf4j
- @Component
- public class MyCallBack implements RabbitTemplate.ConfirmCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init() {
- // 注入
- rabbitTemplate.setConfirmCallback(this);
-
- }
- //交换机确认回调方法
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String reason) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {//发消息 交换机接收到了消息 回调
- log.info("交换机已经收到了ID为:{}的消息", id);
- } else {//发消息 交换机没有接收到了消息 回调
- log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);
-
- }
- }
- }
confirm方法参数介绍:
* 1. correlationData 保存回调消息的ID及相关信息 * 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到) * 3. reason 失败的原因
在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:
- server:
- port: 8899
-
- spring:
- rabbitmq:
- host: 124.221.94.214
- port: 5672
- username: xgsm
- password: xgsm123
- # 发送者开启 confirm 确认机制
- publisher-confirms: true
- publisher-confirm-type: correlated
publisher-confirm-type参数介绍:
publisher-confirm-type这个参数一共有三种配置方法:
# NONE:禁用发布确认,是默认值。
# CORRELATED:发布消息后,交换机会触发回调方法。
# SIMPLE:有两种效果:
1:和CORRELATED一样会触发回调方法
2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,
# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可
- package com.example.rabbitmq_demo.fabuquerengaoji;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @ClaseName: ProducerController$
- * @Description:消息生产者
- * @Author: wuhs
- * @Date: 2023/8/16$ 14:58$
- */
- @Slf4j
- @RestController
- @RequestMapping("/confirm")
- public class ProducerController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- // @PathVariable主要作用:映射URL绑定的占位符
- @RequestMapping("/sendMessage/{message}")
- public void sendMessage(@PathVariable String message) {
- //正常发送
- CorrelationData correlationData = new CorrelationData("1");
- rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
-
- //发送失败-交换机不存在的情况
- CorrelationData correlationData2 = new CorrelationData("2");
- rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);
-
- log.info("发送的消息为:{}", message);
- }
- }
测试结果:
如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”
- CorrelationData correlationData3 = new CorrelationData("3");
- rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);
测试结果:
通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下
1、application.yml文件中添加消息回退配置
- # 发送者开启 return 确认机制
- publisher-returns: true
2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:
- package com.example.rabbitmq_demo.fabuquerengaoji;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- /**
- * @ClaseName: MyCallBack$
- * @Description:
- * @Author: wuhs
- * @Date: 2023/8/16$ 16:17$
- */
- @Slf4j
- @Component
- public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init() {
- // 注入
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
-
- }
-
- /**
- * 交换机确认回调方法
- * 1、发消息 交换机接收到了消息 回调
- * 1.1 correlationData 保存回调消息的ID及相关信息
- * 1.2 交换机收到消息 ack=true
- * 2、发消息 交换机接收失败了 回调
- * 2.1 correlationData 保存回调消息的ID及相关信息
- * 2.2 交换机接收到消息 ack=false
- * 2.3 reason 失败的原因
- */
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String reason) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {
- log.info("交换机已经收到了ID为:{}的消息", id);
- } else {
- log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);
-
- }
- }
-
- //可以在当消息传递的过长中不可达目的地时将消息返回给生产者
- // 只有不可待目的地的时候 才进行回退
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);
- }
- }
测试结果:
2023-08-17 10:36:32.476 INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3
问题解决!~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。