当前位置:   article > 正文

RabbitMq-发布确认高级(避坑指南版)_setpublisherconfirmtype

setpublisherconfirmtype

在初学rabbitMq的时候,伙伴们肯定已经接触到了“发布确认”的概念,但是到了后期学习中,会接触到“springboot”中使用“发布确认”高级的概念。后者主要是解决什么问题呢?或者是什么样的场景引出这样的概念呢?

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 rabbitmq 重启期间生产者投递失败,导致消息丢失,需要手动处理和恢复。因此为了确保rabbitmq 的消息可靠投递,特别是在这样比较极端的情况,rabbitmq 集群不可用的时候,对无法投递的消息进行处理。

废话不说直接开始撸代码!!!在代码中解决实际问题~

一、代码架构分析:

        接触到这里,对于一条完整的“rabbitmq消息”发布链的构成大家已经不陌生了。主要是由:“消息生产者”、“交换机”、“队列”、“消费者”四个方面构成,如图所示:

二、构造“配置类”代码: 

声明交换机“confirm_exchange”、声明队列“confirm_queue”、通过routing-key对交换机和队列进行绑定。

  1. package com.example.rabbitmq_demo.fabuquerengaoji;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @ClaseName: ConfirmConfig$
  8. * @Description:配置类 发布确认(高级)
  9. * @Author: wuhs
  10. * @Date: 2023/8/16$ 14:32$
  11. * 快捷键ctrl+shift+u 字母大小写转化
  12. */
  13. @Configuration
  14. public class ConfirmConfig {
  15. // 交换机
  16. public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
  17. // 队列
  18. public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
  19. // ROUTING-KEY
  20. public static final String CONFIRM_ROUTING_KEY = "key1";
  21. @Bean
  22. public DirectExchange confirmExchange() {
  23. return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  24. }
  25. @Bean
  26. public Queue confirmQueue() {
  27. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  28. }
  29. @Bean
  30. public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange directExchange,
  31. @Qualifier("confirmQueue") Queue queue) {
  32. //一般使用在项目中使用@Qualifier来限定注入的Bean。
  33. return BindingBuilder.bind(queue).to(directExchange).with(CONFIRM_ROUTING_KEY);
  34. }
  35. }

三、构建消费者代码:

通过@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)来监听队列,以此“充当”消费者。这一块也没啥好说的,直接上代码!

  1. package com.example.rabbitmq_demo.fabuquerengaoji;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClaseName: Consumer$
  8. * @Description:消费者
  9. * @Author: wuhs
  10. * @Date: 2023/8/16$ 15:18$】
  11. */
  12. @Slf4j
  13. @Component
  14. public class Consumer {
  15. @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
  16. public void reciverConfirmMessage(Message message) {
  17. String msg = new String(message.getBody());
  18. log.info("接收到的队列confirm.queue消息:{}", msg);
  19. }
  20. }

四、创建“回调”方法

        在最开始我们说到“确保rabbitmq 的消息可靠投递”的概念,那么具体如何确保呢?如果我们在消费者每次消费成功、未消费成功交换机都能进行“回调”确认,是不是就能知道哪些消息消费成功、哪些没有消费成功呢?

        在RabbitTemplate中有一个方法接口(ConfirmCallback),我们只需要实现这个接口并实现“confirm”方法,并将它注入进RabbitTemplate工具中即可创建“回调”。具体代码如下:

  1. package com.example.rabbitmq_demo.fabuquerengaoji;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. /**
  9. * @ClaseName: MyCallBack$
  10. * @Description:
  11. * @Author: wuhs
  12. * @Date: 2023/8/16$ 16:17$
  13. */
  14. @Slf4j
  15. @Component
  16. public class MyCallBack implements RabbitTemplate.ConfirmCallback {
  17. @Autowired
  18. private RabbitTemplate rabbitTemplate;
  19. @PostConstruct
  20. public void init() {
  21. // 注入
  22. rabbitTemplate.setConfirmCallback(this);
  23. }
  24. //交换机确认回调方法
  25. @Override
  26. public void confirm(CorrelationData correlationData, boolean ack, String reason) {
  27. String id = correlationData != null ? correlationData.getId() : "";
  28. if (ack) {//发消息 交换机接收到了消息 回调
  29. log.info("交换机已经收到了ID为:{}的消息", id);
  30. } else {//发消息 交换机没有接收到了消息 回调
  31. log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);
  32. }
  33. }
  34. }

confirm方法参数介绍:

* 1. correlationData 保存回调消息的ID及相关信息
* 2. 交换机是否收到了消息 ack=true(收到)、ack=false(未收到)
* 3. reason 失败的原因

 六、配置类声明:(application.yml)

        在这里需要注意!!这也是最容易踩得坑,不知道有没有小伙伴遇没遇到,“publisher-confirm-type: correlated”也声明了,但是项目创建启动发布消息之后“没有成功回调”的情况,查看了很多的文章,很多博主只配置了publisher-confirm-type、但是并没有开启“confirm 确认机制”,所以会存在“误导”,导致一直找不到失败的原因~具体正确配置,看代码:

  1. server:
  2. port: 8899
  3. spring:
  4. rabbitmq:
  5. host: 124.221.94.214
  6. port: 5672
  7. username: xgsm
  8. password: xgsm123
  9. # 发送者开启 confirm 确认机制
  10. publisher-confirms: true
  11. publisher-confirm-type: correlated

 publisher-confirm-type参数介绍:

publisher-confirm-type这个参数一共有三种配置方法:

# NONE:禁用发布确认,是默认值。

# CORRELATED:发布消息后,交换机会触发回调方法。

# SIMPLE:有两种效果:

1:和CORRELATED一样会触发回调方法

2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,

# 要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

 七、创建Controller层(消息生产者)

        这里演示三种情况。第一种为正常情况下,发送成功后的回调;第二种消息为发送失败、当交换机不存在则发送失败(模拟发送失败),所以将交换机名称修改即可

  1. package com.example.rabbitmq_demo.fabuquerengaoji;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. /**
  10. * @ClaseName: ProducerController$
  11. * @Description:消息生产者
  12. * @Author: wuhs
  13. * @Date: 2023/8/16$ 14:58$
  14. */
  15. @Slf4j
  16. @RestController
  17. @RequestMapping("/confirm")
  18. public class ProducerController {
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. // @PathVariable主要作用:映射URL绑定的占位符
  22. @RequestMapping("/sendMessage/{message}")
  23. public void sendMessage(@PathVariable String message) {
  24. //正常发送
  25. CorrelationData correlationData = new CorrelationData("1");
  26. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
  27. //发送失败-交换机不存在的情况
  28. CorrelationData correlationData2 = new CorrelationData("2");
  29. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"2", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData2);
  30. log.info("发送的消息为:{}", message);
  31. }
  32. }

测试结果: 

 如果是routing-key错误,这种情况会触发回调嘛?让我们验证一下;修改routing-key为“错误值”

  1. CorrelationData correlationData3 = new CorrelationData("3");
  2. rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"key3", message,correlationData3);

测试结果:

         通过结果可以看出,消息发送成功了,而且也触发了“成功的回调”。但是我们知道的是,由于路由失败,这里消费者并没有对消息进行消费,这是为什么呢?那是因为,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。解决方式为:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。具体操作如下

1、application.yml文件中添加消息回退配置

  1. # 发送者开启 return 确认机制
  2. publisher-returns: true

 2、实现RabbitTemplate中的方法接口ReturnCallback,并实现“returnedMessage”方法,最后将类注入到RabbitTemplate的RabbitTemplate中,详细代码如下:

  1. package com.example.rabbitmq_demo.fabuquerengaoji;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. /**
  10. * @ClaseName: MyCallBack$
  11. * @Description:
  12. * @Author: wuhs
  13. * @Date: 2023/8/16$ 16:17$
  14. */
  15. @Slf4j
  16. @Component
  17. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. @PostConstruct
  21. public void init() {
  22. // 注入
  23. rabbitTemplate.setConfirmCallback(this);
  24. rabbitTemplate.setReturnCallback(this);
  25. }
  26. /**
  27. * 交换机确认回调方法
  28. * 1、发消息 交换机接收到了消息 回调
  29. * 1.1 correlationData 保存回调消息的ID及相关信息
  30. * 1.2 交换机收到消息 ack=true
  31. * 2、发消息 交换机接收失败了 回调
  32. * 2.1 correlationData 保存回调消息的ID及相关信息
  33. * 2.2 交换机接收到消息 ack=false
  34. * 2.3 reason 失败的原因
  35. */
  36. @Override
  37. public void confirm(CorrelationData correlationData, boolean ack, String reason) {
  38. String id = correlationData != null ? correlationData.getId() : "";
  39. if (ack) {
  40. log.info("交换机已经收到了ID为:{}的消息", id);
  41. } else {
  42. log.info("交换机没有收到了ID为:{}发的的消息,失败的原因是:{}", id, reason);
  43. }
  44. }
  45. //可以在当消息传递的过长中不可达目的地时将消息返回给生产者
  46. // 只有不可待目的地的时候 才进行回退
  47. @Override
  48. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  49. log.info("消息{},被交换机{}退回,退回原因:{},路由key:{}", message, exchange, replyText, routingKey);
  50. }
  51. }

测试结果:

2023-08-17 10:36:32.476  INFO 21108 --- [221.94.214:5672] c.e.r.fabuquerengaoji.MyCallBack : 消息(Body:'消息确认发布测试' MessageProperties [headers={spring_returned_message_correlation=3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),被交换机confirm_exchange退回,退回原因:NO_ROUTE,路由key:key1key3

问题解决!~ 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/798540
推荐阅读
相关标签
  

闽ICP备14008679号