当前位置:   article > 正文

RabbitMQ发布确认和消息回退(6)

RabbitMQ发布确认和消息回退(6)

概念

发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

两种模式

1.简单确认模式(Simple Publisher Confirm)

在简单确认模式下,每次生产者发送一条消息到 RabbitMQ,都会立即等待 RabbitMQ 返回一个确认消息。如果消息成功发送到 RabbitMQ 服务器上的交换机,并且至少一个队列接收到了消息,RabbitMQ 就会返回一个确认消息给生产者。否则,如果消息发送失败,则会返回一个 Nack 消息。在这种模式下,生产者可以针对每一条消息都进行确认处理,确保消息是否被正确地发送到了 RabbitMQ。

3.批量确认模式(Publisher Confirm with Batch)

在批量确认模式下,生产者可以将一批消息发送到 RabbitMQ,然后等待一段时间后再收到确认消息。这样可以降低每条消息发送的确认成本,并提高性能。批量确认模式通过指定一个大小来定义批量确认的数量,当达到指定的数量后,RabbitMQ 会一次性发送确认消息给生产者。这种模式适用于需要发送大量消息的场景,可以减少确认消息的数量,提高消息发送的效率。

这两种发布确认模式在实现上有一些不同,可以根据实际的业务需求和性能要求来选择合适的模式。简单确认模式更适用于需要对每一条消息进行实时确认的场景,而批量确认模式适用于需要发送大量消息并且希望降低确认消息成本的场景。

开启方法

-- 确认模式配置

spring.rabbitmq.publisher-confirm-type: correlated

  • NONE

禁用发布确认模式,是默认值。

  • CORRELATED

发布消息成功到交换器后会触发回调方法。

  • SIMPLE

测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。(相当于单一发布)

  1. spring:
  2. # 配置RabbitMQ
  3. rabbitmq:
  4. host: 192.168.0.70
  5. port: 5674
  6. username: guest
  7. password: guest
  8. # 虚拟主机
  9. virtual-host: my_vhost
  10. # 开启确认模式
  11. publisher-confirm-type: correlated

 测试

为了方便测试,此机制单独编写类去测试,使用的模式为路由模式(其他模式也可以)

1.创建回调函数

  1. package com.model.callback;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @Author: Haiven
  8. * @Time: 2024/4/22 17:26
  9. * @Description: TODO
  10. */
  11. @Component
  12. @Slf4j
  13. public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  14. /**
  15. * 被调用的回调方法
  16. * @param correlationData 相关配置信息
  17. * @param ack 交换机是否成功收到消息 可以根据 ack 做相关的业务逻辑处理
  18. * @param cause 失败原因
  19. */
  20. @Override
  21. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  22. System.out.println("消息推送回调:配置信息="+correlationData+";结果="+ack+"失败原因="+cause);
  23. if(ack){
  24. //消息推送成功
  25. System.out.println("消息推送成功");
  26. }else{
  27. System.out.println("消息推送失败:" + cause);
  28. }
  29. }
  30. }

回调函数会在消息推送到队列后调用

 2.配置回调函数

将上述回调函数设置到mq的配置中,这里再RabbitmqConfig文件中配置

  1. package com.model.config;
  2. import com.model.callback.MyConfirmCallBack;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.QueueBuilder;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import javax.annotation.Resource;
  11. /**
  12. * @Author: Haiven
  13. * @Time: 2024/4/18 17:28
  14. * @Description: TODO
  15. */
  16. @Configuration
  17. public class RabbitmqConfig {
  18. @Value("${rabbitmq.work.queue}")
  19. private String workQueue;
  20. @Resource
  21. private MyConfirmCallBack myConfirmCallBack;
  22. /**
  23. * 工作模式的队列
  24. * @return 队列
  25. */
  26. @Bean(name = "workQueue")
  27. public Queue getWorkQueue(){
  28. return QueueBuilder.durable(workQueue).build();
  29. }
  30. @Bean
  31. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
  32. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  33. rabbitTemplate.setConnectionFactory(connectionFactory);
  34. // 设置开启 Mandatory 强制执行调用回调函数
  35. rabbitTemplate.setMandatory(true);
  36. //设置回调
  37. rabbitTemplate.setConfirmCallback(myConfirmCallBack);
  38. //设置回退回调
  39. return rabbitTemplate;
  40. }
  41. }

 3.创建交换机和队列

这里创建ConfirmConfig配置文件与其他队列进行区分

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Author: Haiven
  7. * @Time: 2024/4/22 17:22
  8. * @Description: TODO
  9. */
  10. @Configuration
  11. public class ConfirmConfig {
  12. /**
  13. * 测试发布确认的交换机
  14. * @return exchange
  15. */
  16. @Bean(name = "confirmExchange")
  17. public Exchange getConfirmExchange(){
  18. return ExchangeBuilder
  19. .directExchange("exchange_confirm")
  20. .build();
  21. }
  22. /**
  23. * 队列
  24. * @return queue
  25. */
  26. @Bean(name = "confirmQueue")
  27. public Queue getConfirmQueue(){
  28. return QueueBuilder
  29. .durable("queue_confirm")
  30. .build();
  31. }
  32. /**
  33. * 绑定队列
  34. * @return binding
  35. */
  36. @Bean
  37. public Binding getConfirmBinding(){
  38. return BindingBuilder
  39. .bind(getConfirmQueue())
  40. .to(getConfirmExchange())
  41. //路由键 队列1接收debug级别的消息
  42. .with("confirm")
  43. .noargs();
  44. }
  45. }

 4.创建消费者

  1. package com.model.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * @Author: Haiven
  6. * @Time: 2024/4/22 17:31
  7. * @Description: TODO
  8. */
  9. @Component
  10. public class ConfirmConsumer {
  11. @RabbitListener(queues = {"queue_confirm"})
  12. public void routingConfirm(String msg){
  13. System.out.println("消费者 -confirm- 接收消息:" + msg);
  14. }
  15. }

 5.发送并接收消息

  1. package com.model.controller;
  2. import com.code.domain.Response;
  3. import com.model.service.RabbitService;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. /**
  10. * @Author: Haiven
  11. * @Time: 2024/4/19 9:46
  12. * @Description: TODO
  13. */
  14. @RestController
  15. @RequestMapping("/producer")
  16. public class ProducerController {
  17. @Resource
  18. private RabbitService rabbitService;
  19. @GetMapping("/simple")
  20. public Response<Void> simple(String msg){
  21. boolean res = rabbitService.simple(msg);
  22. return res ? Response.success() : Response.fail();
  23. }
  24. @GetMapping("/work")
  25. public Response<Void> work(String msg){
  26. boolean res = rabbitService.work(msg);
  27. return res ? Response.success() : Response.fail();
  28. }
  29. @GetMapping("/sub")
  30. public Response<Void> sub(String msg){
  31. boolean res = rabbitService.sub(msg);
  32. return res ? Response.success() : Response.fail();
  33. }
  34. @GetMapping("/routing")
  35. public Response<Void> routing(String msg, String type){
  36. boolean res = rabbitService.routing(msg, type);
  37. return res ? Response.success() : Response.fail();
  38. }
  39. @GetMapping("/topic")
  40. public Response<Void> topic(String msg, String type){
  41. boolean res = rabbitService.topic(msg, type);
  42. return res ? Response.success() : Response.fail();
  43. }
  44. @GetMapping("/confirm")
  45. public Response<Void> confirm(String msg, String type){
  46. boolean res = rabbitService.confirm(msg, type);
  47. return res ? Response.success() : Response.fail();
  48. }
  49. }
  1. package com.model.service.impl;
  2. import com.model.service.RabbitService;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Service;
  7. import javax.annotation.Resource;
  8. /**
  9. * @Author: Haiven
  10. * @Time: 2024/4/19 10:51
  11. * @Description: TODO
  12. */
  13. @Service
  14. @Slf4j
  15. public class RabbitServiceImpl implements RabbitService {
  16. @Resource
  17. private RabbitTemplate rabbitTemplate;
  18. @Value("${rabbitmq.simple.queue}")
  19. private String simpleQueue;
  20. @Value("${rabbitmq.work.queue}")
  21. private String workQueue;
  22. @Override
  23. public boolean simple(String msg) {
  24. try {
  25. rabbitTemplate.convertAndSend(simpleQueue, msg);
  26. return true;
  27. }catch (Exception e){
  28. e.printStackTrace();
  29. return false;
  30. }
  31. }
  32. @Override
  33. public boolean work(String msg) {
  34. try {
  35. rabbitTemplate.convertAndSend(workQueue, msg);
  36. return true;
  37. }catch (Exception e){
  38. e.printStackTrace();
  39. return false;
  40. }
  41. }
  42. @Override
  43. public boolean sub(String msg) {
  44. try {
  45. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  46. rabbitTemplate.convertAndSend("exchange_sub","", msg);
  47. return true;
  48. }catch (Exception e){
  49. e.printStackTrace();
  50. return false;
  51. }
  52. }
  53. @Override
  54. public boolean routing(String msg, String type) {
  55. System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");
  56. try {
  57. //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
  58. rabbitTemplate.convertAndSend("exchange_routing",type, msg);
  59. return true;
  60. }catch (Exception e){
  61. e.printStackTrace();
  62. return false;
  63. }
  64. }
  65. @Override
  66. public boolean topic(String msg, String type) {
  67. System.out.println("主题模式发送消息:msg="+msg+",type="+type+"");
  68. try {
  69. //主题模式会根据 type的通配符进行分发
  70. rabbitTemplate.convertAndSend("exchange_topic",type, msg);
  71. return true;
  72. }catch (Exception e){
  73. e.printStackTrace();
  74. return false;
  75. }
  76. }
  77. @Override
  78. public boolean confirm(String msg, String type) {
  79. System.out.println("发布确认模式发送消息:msg="+msg+",type="+type+"");
  80. try {
  81. rabbitTemplate.convertAndSend("exchange_confirm",type, msg);
  82. return true;
  83. }catch (Exception e){
  84. e.printStackTrace();
  85. return false;
  86. }
  87. }
  88. }

 发送消息

 接收消息

 发送成功后回调函数会执行,即使消费者没有消费该消息,回调函数仍然会执行

 消息回退

当第二条消息推送后消费者是没有消费消息的,虽然推送成功,但是却被丢弃了,而此时生产者需要知道此消息是否被消费成功,所有就使用到了消息回退机制

spring.rabbitmq.publisher-returns=true

1.设置回调函数

MyReturnCallBack

  1. package com.model.callback;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @Author: Haiven
  8. * @Time: 2024/4/23 10:16
  9. * @Description: TODO
  10. */
  11. @Component
  12. @Slf4j
  13. public class MyReturnCallBack implements RabbitTemplate.ReturnCallback {
  14. /**
  15. *
  16. * @param msg 消息对象
  17. * @param errCode 错误码
  18. * @param errMsg 错误信息
  19. * @param exchange 交换机
  20. * @param rout 路由键
  21. */
  22. @Override
  23. public void returnedMessage(Message msg, int errCode, String errMsg, String exchange, String rout) {
  24. log.debug("消息对象={},错误码={},错误消息={},交换机={},路由键={}", msg, errCode, errMsg, exchange, rout);
  25. }
  26. }

 此函数会在消息被丢弃或者消费失败后回调

2.设置到配置中

在RabbitmqConfig配置文件设置

  1. package com.model.config;
  2. import com.model.callback.MyConfirmCallBack;
  3. import com.model.callback.MyReturnCallBack;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.QueueBuilder;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import javax.annotation.Resource;
  12. /**
  13. * @Author: Haiven
  14. * @Time: 2024/4/18 17:28
  15. * @Description: TODO
  16. */
  17. @Configuration
  18. public class RabbitmqConfig {
  19. @Value("${rabbitmq.work.queue}")
  20. private String workQueue;
  21. @Resource
  22. private MyConfirmCallBack myConfirmCallBack;
  23. @Resource
  24. private MyReturnCallBack myReturnCallBack;
  25. /**
  26. * 工作模式的队列
  27. * @return 队列
  28. */
  29. @Bean(name = "workQueue")
  30. public Queue getWorkQueue(){
  31. return QueueBuilder.durable(workQueue).build();
  32. }
  33. @Bean
  34. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
  35. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  36. rabbitTemplate.setConnectionFactory(connectionFactory);
  37. // 设置开启 Mandatory 强制执行调用回调函数
  38. rabbitTemplate.setMandatory(true);
  39. //设置发布确认
  40. rabbitTemplate.setConfirmCallback(myConfirmCallBack);
  41. //设置消息回退
  42. rabbitTemplate.setReturnCallback(myReturnCallBack);
  43. //设置回退回调
  44. return rabbitTemplate;
  45. }
  46. }

先注入,在设置到rabbitTemplate对象中,这样发送消息时就可以回调

3.发送并接收消息

此处使用confirm交换机测试

发送一条没有路由的消息

 

 此时消息推送成功,但是没有被消费者消费,而是被丢弃,所有消息回退的回调函数执行:

消息对象=(

        Body:'消息',

        MessageProperties : [

                headers={},

                contentType=text/plain,

                contentEncoding=UTF-8,                                           

                contentLength=0,

                receivedDeliveryMode=PERSISTENT,

                priority=0, deliveryTag=0])

错误码=312

错误消息=NO_ROUTE

交换机=exchange_confirm

路由键=unknown

 发送一条路由的消息

 消息推送到交换机成功,并被成功消费,回退消息的回调函数未执行

备份交换机

有了发布确认和回退消息,我们获得了对无法投递消息的感知能力,在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,就可以使用备份交换机

 备份交换机可以理解为 RabbitMQ 中交换机的“备份”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个 备份,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

1.创建备份交换机

为了方便区分,这里新建一个配置类ConfirmBackupConfig,用于创建虚拟机和队列,备份交换机的类型一定要为fanout

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.context.annotation.DependsOn;
  6. import org.springframework.core.annotation.Order;
  7. /**
  8. * @Author: Haiven
  9. * @Time: 2024/4/23 15:07
  10. * @Description: TODO
  11. */
  12. @Configuration
  13. public class ConfirmBackupConfig {
  14. /**
  15. * exchange_confirm 交换机的备份交换机
  16. * @return exchange
  17. */
  18. @Bean(name = "confirmBackupExchange")
  19. public Exchange getConfirmBackupExchange(){
  20. return ExchangeBuilder
  21. .fanoutExchange("exchange_confirm_backup")
  22. .build();
  23. }
  24. @Bean("confirmBackupQueue")
  25. public Queue getConfirmBackupQueue(){
  26. return QueueBuilder
  27. .durable("queue_confirm_backup")
  28. .build();
  29. }
  30. @Bean("confirmBackupBinding")
  31. public Binding getConfirmBackupBinding(){
  32. return BindingBuilder
  33. .bind(getConfirmBackupQueue())
  34. .to(getConfirmBackupExchange())
  35. .with("")
  36. .noargs();
  37. }
  38. }

 2.绑定备份交换机

这里我们用上面的消息回退测试用的交换机进行测试,就不额外再创建了,在ConfirmConfig配置文件中直接绑定:

.withArgument("alternate-exchange", "exchange_confirm_backup")

在创建交换机的时候直接指定 alternate-exchange,exchange_confirm_backup为备份交换的名称

  1. package com.model.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.boot.autoconfigure.AutoConfigureAfter;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.context.annotation.Import;
  7. /**
  8. * @Author: Haiven
  9. * @Time: 2024/4/22 17:22
  10. * @Description: TODO
  11. */
  12. @Configuration
  13. public class ConfirmConfig {
  14. /**
  15. * 测试发布确认的交换机
  16. * @return exchange
  17. */
  18. @Bean(name = "confirmExchange")
  19. public Exchange getConfirmExchange(){
  20. return ExchangeBuilder
  21. .directExchange("exchange_confirm")
  22. .withArgument("alternate-exchange", "exchange_confirm_backup")
  23. .durable(true)
  24. .build();
  25. }
  26. /**
  27. * 队列
  28. * @return queue
  29. */
  30. @Bean(name = "confirmQueue")
  31. public Queue getConfirmQueue(){
  32. return QueueBuilder
  33. .durable("queue_confirm")
  34. .build();
  35. }
  36. /**
  37. * 绑定队列
  38. * @return binding
  39. */
  40. @Bean
  41. public Binding getConfirmBinding(){
  42. return BindingBuilder
  43. .bind(getConfirmQueue())
  44. .to(getConfirmExchange())
  45. //路由键 队列1接收debug级别的消息
  46. .with("confirm")
  47. .noargs();
  48. }
  49. }

 由于之前创建exchange_confirm交换机的时候没有指定备份交换机,所以这里要先将该交换机删掉,然后重新创建,备份交换机一定要在创建的时候指定

 进入控制台删除,不想删可创建新的交换机用于测试

 3.备份交换机消费者

直接在ConfirmConsumer配置文件中声明confirmBackupConsumer

  1. package com.model.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @Author: Haiven
  7. * @Time: 2024/4/22 17:31
  8. * @Description: TODO
  9. */
  10. @Component
  11. @Slf4j
  12. public class ConfirmConsumer {
  13. @RabbitListener(queues = {"queue_confirm"})
  14. public void confirmConsumer(String msg){
  15. System.out.println("消费者 -confirm- 接收消息:" + msg);
  16. }
  17. @RabbitListener(queues = {"queue_confirm_backup"})
  18. public void confirmBackupConsumer(String msg){
  19. log.debug("消费者 -- 备份队列 -- 接收消息:" + msg);
  20. }
  21. }

4.发送消息测试

发送一条没有路由的消息

1.可以看到发布确认的消息回调执行,说明已经推送到交换机

2.但是消息回退的回调没有执行,但该消息没有被confirm的消费者消费

3.该消息被备份交换机的消费者消费

有了备份交换机,消息如果消费失败,消息不会回退,而会被备份队列的消费者接收

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/496760
推荐阅读
相关标签
  

闽ICP备14008679号