当前位置:   article > 正文

RabbitMq 消息确认和退回机制

RabbitMq 消息确认和退回机制

一、Rabbit中消息确认和退回机制

1、发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,(单个)如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号。

1.1 开启发布确认

//开启发布确认
channel.confirmSelect();

1.2 发布确认的两种方式

      同步确认

      就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

  1. public static void publishMessageIndividually() throws Exception {
  2. Channel channel = RabbitMqUtils.getChannel();
  3. //队列声明
  4. String queueName = UUID.randomUUID().toString();
  5. channel.queueDeclare(queueName, true, false, false, null);
  6. //开启发布确认
  7. channel.confirmSelect();
  8. long begin = System.currentTimeMillis();
  9. for (int i = 0; i < MESSAGE_COUNT; i++) {
  10. String message = i + "";
  11. channel.basicPublish("", queueName, null, message.getBytes());
  12. //服务端返回 false 或超时时间内未返回,生产者可以消息重发
  13. boolean flag = channel.waitForConfirms();
  14. if (flag) {
  15. System.out.println("消息发送成功");
  16. }
  17. }
  18. long end = System.currentTimeMillis();
  19. System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
  20. }
  异步确认 

    他是利用回调函数来达到消息可靠性传递的

  1. // 异步确认发布
  2. private static void publishMessageAsync() throws Exception {
  3. Channel channel = RabbitMqUtils.getChannel();
  4. //队列声明
  5. String queueName = UUID.randomUUID().toString();
  6. channel.queueDeclare(queueName, true, false, false, null);
  7. //开启发布确认
  8. channel.confirmSelect();
  9. /**线程安全有序的一个哈希表,适用于高并发的情况下
  10. 1.轻松的将序号与消息进行关联
  11. 2.轻松批量删除条目,只要给到序号
  12. 3.支持高并发(多线程)
  13. **/
  14. //消息确认成功,回调函数
  15. ConcurrentSkipListMap<Long,String> outstandingConfirms =
  16. new ConcurrentSkipListMap<>();
  17. ConfirmCallback ackCallback = (deliveryTag , multiple)->{ //确认了多少条,multiple:批量或单个
  18. if (multiple){ //批量的
  19. //2.轻松批量删除条目,只要给到序号
  20. ConcurrentNavigableMap<Long,String> confirmed =
  21. outstandingConfirms.headMap(deliveryTag); //消息的序号
  22. confirmed.clear();
  23. }else { //单个的
  24. outstandingConfirms.remove(deliveryTag);
  25. }
  26. System.out.println("确认的消息:"+deliveryTag);
  27. };
  28. //消息确认失败,回调函数
  29. /**
  30. * 1.消息的标记
  31. * 2.是否为批量确认
  32. */
  33. ConfirmCallback nackCallback = (deliveryTag , multiple)->{
  34. //3、打印一下未确认的消息都有哪些
  35. String message = outstandingConfirms.get(deliveryTag);
  36. System.out.println("未确认的消息是:"+message+":::未确认的消息:"+deliveryTag);
  37. };
  38. //准备消息的监听器 ,监听哪些消息成功了, 哪些失败了
  39. channel.addConfirmListener( ackCallback, nackCallback);
  40. //开始时间
  41. long begin = System.currentTimeMillis();
  42. //发送消息
  43. for (int i = 0; i < MESSAGE_COUNT; i++) {
  44. String message = i + "";
  45. channel.basicPublish("", queueName, null, message.getBytes());
  46. //1.轻松的将序号与消息进行关联,将每一条消息都存放hash表里面
  47. outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
  48. }
  49. //结束时间
  50. long end = System.currentTimeMillis();
  51. System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
  52. }

2、 退回机制

退回模式(return)说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。

默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中

  1. channel.addReturnListener(new ReturnCallback() {
  2. @Override
  3. public void handle(Return returnMessage) {
  4. System.out.println("消息被回退,原因:"+returnMessage.getReplyText());
  5. System.out.println(returnMessage.getExchange()); // 交换机
  6. System.out.println(returnMessage.getReplyCode()); // 返回原因的代码
  7. System.out.println(returnMessage.getReplyText()); // 返回信息,例如NO_ROUTE
  8. System.out.println(returnMessage.getRoutingKey()); // 路由KEY
  9. }
  10. });

二、Spring boot 中Rabbit 实现消息确认和退回机制

1、 开启配置

  1. spring:
  2. #配置rabbitMq 服务器
  3. rabbitmq:
  4. host: 127.0.0.1
  5. port: 5672
  6. username: root
  7. password: root
  8. #虚拟host 可以不设置,使用server默认host
  9. virtual-host: JCcccHost
  10. #确认消息已发送到交换机(Exchange)
  11. publisher-confirm-type: correlated
  12. #确认消息已发送到队列(Queue)
  13. publisher-returns: true

2、配置消息确认和退回机制

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * @Author : JCccc
  9. * @CreateTime : 2019/9/3
  10. * @Description :
  11. **/
  12. @Configuration
  13. public class RabbitConfig {
  14. @Bean
  15. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
  16. RabbitTemplate rabbitTemplate = new RabbitTemplate();
  17. rabbitTemplate.setConnectionFactory(connectionFactory);
  18. // 配置发布到交换机的确认
  19. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  20. /**
  21. *correlationData :客户端在发送原始消息时提供的对象。
  22. *ack:exchange交换机是否成功收到了消息。true成功,false代表失败。
  23. *cause:失败原因。
  24. */
  25. @Override
  26. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  27. System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
  28. System.out.println("ConfirmCallback: "+"确认情况:"+ack);
  29. System.out.println("ConfirmCallback: "+"原因:"+cause);
  30. }
  31. });
  32. // 配置交换机没有找到对应的消息队列时,消息退回时的处理
  33. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  34. @Override
  35. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  36. System.out.println("ReturnCallback: "+"消息:"+message);
  37. System.out.println("ReturnCallback: "+"回应码:"+replyCode);
  38. System.out.println("ReturnCallback: "+"回应信息:"+replyText);
  39. System.out.println("ReturnCallback: "+"交换机:"+exchange);
  40. System.out.println("ReturnCallback: "+"路由键:"+routingKey);
  41. }
  42. });
  43. return rabbitTemplate;
  44. }
  45. }

2.1 其他方式

  1. package com.student.rabbitmq.config;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. /**
  9. * 第二种方式
  10. */
  11. @Component
  12. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. // 将我们实现的MyCallBack接口注入到RabbitTemplate中
  16. @PostConstruct
  17. public void init() {
  18. // 设置确认消息交给谁处理
  19. rabbitTemplate.setConfirmCallback(this);
  20. // 设置回退消息交给谁处理
  21. rabbitTemplate.setReturnCallback(this);
  22. }
  23. /**
  24. * 交换机确认回调方法
  25. *
  26. * @param correlationData 保存回调消息的ID以及相关信息
  27. * @param ack 表示交换机是否收到消息(true表示收到)
  28. * @param cause 表示消息接收失败的原因(收到消息为null)
  29. */
  30. @Override
  31. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  32. String id = correlationData != null ? correlationData.getId() : "";
  33. if (ack) {
  34. System.out.println("交换机已经收到ID为:{}的消息");
  35. } else {
  36. System.out.println("交换机还未收到ID为:{}的消息,原因为:{}" + cause);
  37. }
  38. }
  39. /**
  40. * 路由出现问题的消息回退方法
  41. *
  42. */
  43. @Override
  44. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  45. System.out.println(new String(message.getBody())+
  46. exchange+
  47. replyText+
  48. routingKey);
  49. }
  50. }

三、参考

发布确认—发布确认逻辑和发布确认的策略

消息可靠性之发布确认、退回机制_rabbittemplate 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/800220
推荐阅读
相关标签
  

闽ICP备14008679号