当前位置:   article > 正文

【初始RabbitMQ】高级发布确认的实现

【初始RabbitMQ】高级发布确认的实现

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢

发布确认SpringBoot版本

确认机制方案

代码架构图

配置文件

  1. spring.rabbitmq.host=118.31.6.132
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=123
  5. spring.rabbitmq.publisher-confirm-type=correlated

在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-type=correlated

  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

引入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>3.2.2</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>springboot-rabbitmq</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>demo</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>17</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-web</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.projectlombok</groupId>
  30. <artifactId>lombok</artifactId>
  31. <optional>true</optional>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.amqp</groupId>
  40. <artifactId>spring-rabbit-test</artifactId>
  41. <scope>test</scope>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-autoconfigure</artifactId>
  46. <version>3.2.0</version>
  47. </dependency>
  48. </dependencies>
  49. <build>
  50. <plugins>
  51. <plugin>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-maven-plugin</artifactId>
  54. </plugin>
  55. </plugins>
  56. </build>
  57. </project>

添加配置类

  1. package com.example.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class ConfirmConfig {
  8. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. //声明确认队列
  11. @Bean
  12. public Queue confirmQueue(){
  13. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  14. }
  15. //声明确认交换机
  16. @Bean
  17. public DirectExchange confirmExchange(){
  18. return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  19. }
  20. //声明确认队列绑定关系
  21. @Bean
  22. public Binding queueBinding(@Qualifier("confirmQueue")Queue queue,
  23. @Qualifier("confirmExchange")DirectExchange directExchange){
  24. return BindingBuilder.bind(queue).to(directExchange).with("key1");
  25. }
  26. }

回调接口

  1. package com.example.component;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @Slf4j
  8. public class MyCallBack implements RabbitTemplate.ConfirmCallback {
  9. /**
  10. * 交换机不管是否收到消息的一个回调方法
  11. * CorrelationData
  12. * 消息相关数据
  13. * ack
  14. * 交换机是否收到消息
  15. */
  16. @Override
  17. public void confirm(CorrelationData correlationData,boolean ack,String cause){
  18. String id = correlationData!=null?correlationData.getId():"";
  19. if(ack){
  20. log.info("交换机已经收到 id 为:{}的消息",id);
  21. }else{
  22. log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
  23. }
  24. }
  25. }

消息生产者

  1. package com.example.controller;
  2. import com.example.component.MyCallBack;
  3. import jakarta.annotation.PostConstruct;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. @RestController
  13. @RequestMapping("/confirm")
  14. @Slf4j
  15. public class producer {
  16. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  17. @Autowired
  18. private RabbitTemplate rabbitTemplate;
  19. @Autowired
  20. private MyCallBack myCallBack;
  21. @PostConstruct //在类实例被创建后(通过依赖注入完成,并且在任何其他初始化方法之前),容器会自动调用这个方法
  22. public void init(){
  23. rabbitTemplate.setConfirmCallback(myCallBack);
  24. }
  25. @GetMapping("/sendMessage/{message}")
  26. public void sendMessage(@PathVariable String message){
  27. //指定消息id为1
  28. CorrelationData correlationData1 = new CorrelationData("1");
  29. String routingKey = "key1";
  30. rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,
  31. message+routingKey,correlationData1);
  32. log.info("发送消息内容:{}",message+routingKey);
  33. CorrelationData correlationData2 = new CorrelationData("2");
  34. routingKey = "key2";
  35. rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,
  36. message+routingKey,correlationData2);
  37. log.info("发送消息内容:{}",message+routingKey);
  38. }
  39. }

消息消费者

  1. package com.example.component;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @Slf4j
  8. public class ConfirmConsumer {
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. @RabbitListener(queues = CONFIRM_QUEUE_NAME)
  11. public void receiveMsg(Message message){
  12. String msg=new String(message.getBody());
  13. log.info("接受到队列 confirm.queue 消息:{}",msg);
  14. }
  15. }

结果分析

访问:http://127.0.0.1:8080/confirm/sendMessage/%E4%BD%A0%E5%A5%BD

 

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为 第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条 消息被直接丢弃了 

回退消息

Mandatory参数

在仅开启了生产者确认机制的情况下,交换还击接收到消息之后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何 让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者

消息生产者代码

  1. package com.example.controller;
  2. import jakarta.annotation.PostConstruct;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.ReturnedMessage;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PathVariable;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.util.UUID;
  13. @RestController
  14. @Slf4j
  15. @RequestMapping("/confirm")
  16. public class MessageProducer implements RabbitTemplate.ConfirmCallback
  17. ,RabbitTemplate.ReturnsCallback{
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. @PostConstruct
  21. private void init(){
  22. rabbitTemplate.setConfirmCallback(this);
  23. /**
  24. * true:交换机无法将消息进行路由时,会将消息返回给生产者
  25. * false:如果发现纤细无法进行路由时,直接的丢弃
  26. */
  27. rabbitTemplate.setMandatory(true);
  28. //设置回退消息给谁处理
  29. rabbitTemplate.setReturnsCallback(this);
  30. }
  31. @GetMapping("/sendMessage/{message}")
  32. public void sendMessage(@PathVariable String message){
  33. //让消息绑定一个id值
  34. CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
  35. rabbitTemplate.convertAndSend("confirm.exchange","key1"
  36. ,message+"key1",correlationData1);
  37. log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");
  38. CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());
  39. rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData2)
  40. ;
  41. log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");
  42. }
  43. @Override
  44. public void confirm(CorrelationData correlationData,boolean ack,String cause){
  45. String id = correlationData != null ? correlationData.getId() : "";
  46. if (ack) {
  47. log.info("交换机收到消息确认成功, id:{}", id);
  48. } else {
  49. log.error("消息 id:{}未成功投递到交换机,原因是:{}", id, cause);
  50. }
  51. }
  52. @Override
  53. public void returnedMessage(ReturnedMessage returnedMessage) {
  54. log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
  55. new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText()
  56. ,returnedMessage.getExchange(), returnedMessage.getRoutingKey());
  57. }
  58. }

 回调接口

  1. package com.example.component;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.ReturnedMessage;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @Slf4j
  9. public class MyCallBack implements RabbitTemplate.ConfirmCallback
  10. ,RabbitTemplate.ReturnsCallback{
  11. /**
  12. * 交换机不管是否收到消息的一个回调方法
  13. * CorrelationData
  14. * 消息相关数据
  15. * ack
  16. * 交换机是否收到消息
  17. */
  18. @Override
  19. public void confirm(CorrelationData correlationData,boolean ack,String cause){
  20. String id = correlationData!=null?correlationData.getId():"";
  21. if(ack){
  22. log.info("交换机已经收到 id 为:{}的消息",id);
  23. }else{
  24. log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
  25. }
  26. }
  27. @Override
  28. public void returnedMessage(ReturnedMessage returnedMessage) {
  29. log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",
  30. new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText()
  31. ,returnedMessage.getExchange(), returnedMessage.getRoutingKey());
  32. }
  33. }

 消息消费者

  1. package com.example.component;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. @Slf4j
  8. public class ConfirmConsumer {
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. @RabbitListener(queues = CONFIRM_QUEUE_NAME)
  11. public void receiveMsg(Message message){
  12. String msg=new String(message.getBody());
  13. log.info("接受到队列 confirm.queue 消息:{}",msg);
  14. }
  15. }

结果分析

备份交换机

备份交换机是 RabbitMQ 中的一个机制,用于处理无法被路由到队列的消息。它可以看作是交换机的“备胎”,当交换机接收到无法路由的消息时,会将这些消息转发到备份交换机中,由备份交换机来进行处理

通常情况下,备份交换机的类型是 Fanout,这意味着它会将所有接收到的消息广播给与其绑定的所有队列。因此,当备份交换机接收到无法路由的消息时,这些消息会被发送到与备份交换机绑定的队列中

通过设置备份交换机,我们可以将无法被路由的消息存储到一个特定的队列中,然后可以通过监控这个队列来进行报警或者手动处理。这样做既可以避免丢失消息,又不会增加生产者的复杂性,同时也提高了系统的稳定性和可靠性

代码架构图

 修改配置类

  1. package com.example.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class ConfirmConfig {
  8. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
  11. public static final String BACKUP_QUEUE_NAME = "backup.queue";
  12. public static final String WARNING_QUEUE_NAME = "warning.queue";
  13. //声明确认队列
  14. @Bean
  15. public Queue confirmQueue() {
  16. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  17. }
  18. //声明确认队列绑定关系
  19. @Bean
  20. public Binding queueBinding(@Qualifier("confirmQueue")Queue queue,
  21. @Qualifier("confirmExchange")DirectExchange directExchange){
  22. return BindingBuilder.bind(queue).to(directExchange).with("key1");
  23. }
  24. //声明备份交换机
  25. @Bean
  26. public FanoutExchange backupExchange(){
  27. return new FanoutExchange(BACKUP_EXCHANGE_NAME);
  28. }
  29. //声明确认交换机并且绑定备份交换机
  30. @Bean
  31. public DirectExchange confirmExchange(){
  32. ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
  33. .durable(true)
  34. //设置该交换机的备用交换机
  35. .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
  36. return (DirectExchange) exchangeBuilder.build();
  37. }
  38. //声明警告队列
  39. @Bean
  40. public Queue warningQueue(){
  41. return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
  42. }
  43. // 声明报警队列绑定关系
  44. @Bean
  45. public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
  46. @Qualifier("backupExchange") FanoutExchange
  47. backupExchange){
  48. return BindingBuilder.bind(queue).to(backupExchange);
  49. }
  50. // 声明备份队列
  51. @Bean("backQueue")
  52. public Queue backQueue(){
  53. return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
  54. }
  55. // 声明备份队列绑定关系
  56. @Bean
  57. public Binding backupBinding(@Qualifier("backQueue") Queue queue,
  58. @Qualifier("backupExchange") FanoutExchange backupExchange){
  59. return BindingBuilder.bind(queue).to(backupExchange);
  60. }
  61. }

报警消费者

  1. package com.example.controller;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.web.bind.annotation.RestController;
  6. @RestController
  7. @Slf4j
  8. public class WarningConsumer {
  9. public static final String WARNING_QUEUE_NAME = "warning.queue";
  10. @RabbitListener(queues = WARNING_QUEUE_NAME)
  11. public void receiveWarningMsg(Message message) {
  12. String msg = new String(message.getBody());
  13. log.error("报警发现不可路由消息:{}", msg);
  14. }
  15. }

 测试注意事项

直接运行会报错,因为我们之前创建过此队列,如要修改只能删除,或者新建队列

我们可以登录管理后台,删除该队列 

结果分析

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/157904
推荐阅读
相关标签
  

闽ICP备14008679号