当前位置:   article > 正文

SpringBoot+RabbitMQ 实现手动消息确认(ACK)

message.getmessageproperties().getdeliverytag()

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:writing-bugs.blog.csdn.net/

article/details/103701101

96288792746191cf2bcb54e3484c55b9.jpeg


一、前言

前几天我研究了关于springboot整合简单消息队列,实现springboot推送消息至队列中,消费者成功消费。同时也加了消息转发器,对消息转发器各种类型的配置等做了总结。

但是,主要还有一点,我一直存在疑问:如何确保消息成功被消费者消费?

说到这里,我相信很多人会说使用ack啊,关闭队列自动删除啊什么的。主要是道理大家都懂,我要实际的代码,网上找了半天,和我设想的有很大差异,还是自己做研究总结吧。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro

  • 视频教程:https://doc.iocoder.cn/video/

二、准备

本次写案例,就按照最简单的方式,direct方式进行配置吧,实际流程如下所示:

fab9a57d4d3b1bfa04f95840bcb01072.png
  • 消息转发器类型: direct直连方式。

  • 消息队列: 暂时采取公平分发方式。

  • 实现流程: 消息生产者生产的消息发送至队列中,由两个消费者获取并消费,消费完成后,清楚消息队列中的消息。

所以我们接下来先写配置和demo。

2.1、依赖引入

再一般的springboot 2.1.4项目中,添加一个pom依赖。

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.2、连接yml的配置

我们这边暂时只有一个rabbitmq,所以连接操作,基本rabbitmq的信息配置问题直接再yml中编写就可以了。

  1. spring:
  2.   rabbitmq:
  3.     host: 127.0.0.1
  4.     port: 5672
  5.     username: xiangjiao
  6.     password: bunana
  7.     virtual-host: /xiangjiao
  8.     publisher-confirms: true   #开启发送确认
  9.     publisher-returns: true  #开启发送失败回退
  10.     
  11.     #开启ack
  12.     listener:
  13.       direct:
  14.         acknowledge-mode: manual
  15.       simple:
  16.         acknowledge-mode: manual #采取手动应答
  17.         #concurrency: 1 # 指定最小的消费者数量
  18.         #max-concurrency: 1 #指定最大的消费者数量
  19.         retry:
  20.           enabled: true # 是否支持重试

2.3、config注入配置

我们根据图示

32a041346df27912086b502f0c0ae0ed.png

知道我们必须配置以下东西:

  • 一个消息转发器,我们取名directExchangeTx

  • 一个消息队列,取名directQueueTx,并将其绑定至指定的消息转发器上。

所以我们的配置文件需要这么写:

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9.  * 直连交换机,发送指定队列信息,但这个队列后有两个消费者同时进行消费
  10.  * @author 7651
  11.  *
  12.  */
  13. @Configuration
  14. public class DirectExchangeTxQueueConfig {
  15.  
  16.  @Bean(name="getDirectExchangeTx")
  17.  public DirectExchange getDirectExchangeTx(){
  18.   return new DirectExchange("directExchangeTx"truefalse);
  19.  }
  20.  
  21.  @Bean(name="getQueueTx")
  22.  public Queue getQueueTx(){
  23.   return new Queue("directQueueTx"truefalsefalse);
  24.  }
  25.  
  26.  @Bean
  27.  public Binding getDirectExchangeQueueTx(
  28.    @Qualifier(value="getDirectExchangeTx") DirectExchange getDirectExchangeTx,
  29.    @Qualifier(value="getQueueTx") Queue getQueueTx){
  30.   return BindingBuilder.bind(getQueueTx).to(getDirectExchangeTx).with("directQueueTxRoutingKey");
  31.  }
  32. }

2.4、消费者的配置

有了队列和消息转发器,消息当然需要去消费啊,所以我们接下来配置消息消费者。

bad25d8c42eb5080943b53bbe65b7f7f.png

从图中,我们看出,我们需要配置两个消息消费者,同时监听一个队列,所以我们的配置类为:

消费者一:

  1. import java.io.IOException;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import com.rabbitmq.client.Channel;
  7. @Component
  8. @RabbitListener(queues="directQueueTx")
  9. public class Consumer1 {
  10.  @RabbitHandler
  11.  public void process(String msg,Channel channel, Message message) throws IOException {
  12.   //拿到消息延迟消费
  13.   try {
  14.    Thread.sleep(1000*1);
  15.   } catch (InterruptedException e) {
  16.    e.printStackTrace();
  17.   }
  18.   
  19.   
  20.   try {
  21.    /**
  22.     * 确认一条消息:<br>
  23.     * channel.basicAck(deliveryTag, false); <br>
  24.     * deliveryTag:该消息的index <br>
  25.     * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息 <br>
  26.     */
  27.    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  28.    System.out.println("get msg1 success msg = "+msg);
  29.    
  30.   } catch (Exception e) {
  31.    //消费者处理出了问题,需要告诉队列信息消费失败
  32.    /**
  33.     * 拒绝确认消息:<br>
  34.     * channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ; <br>
  35.     * deliveryTag:该消息的index<br>
  36.     * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。<br>
  37.     * requeue:被拒绝的是否重新入队列 <br>
  38.     */
  39.    channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  40.      falsetrue);
  41.    System.err.println("get msg1 failed msg = "+msg);
  42.    
  43.    /**
  44.     * 拒绝一条消息:<br>
  45.     * channel.basicReject(long deliveryTag, boolean requeue);<br>
  46.     * deliveryTag:该消息的index<br>
  47.     * requeue:被拒绝的是否重新入队列 
  48.     */
  49.    //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  50.   }
  51.  }
  52. }

消息消费者二:

  1. import java.io.IOException;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import com.rabbitmq.client.Channel;
  7. @Component
  8. @RabbitListener(queues="directQueueTx")
  9. public class Consumer2 {
  10.  @RabbitHandler
  11.  public void process(String msg,Channel channel, Message message) throws IOException {
  12.   //拿到消息延迟消费
  13.   try {
  14.    Thread.sleep(1000*3);
  15.   } catch (InterruptedException e) {
  16.    e.printStackTrace();
  17.   }
  18.   
  19.   
  20.   try {
  21.    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  22.    System.out.println("get msg2 success msg = "+msg);
  23.    
  24.   } catch (Exception e) {
  25.    //消费者处理出了问题,需要告诉队列信息消费失败
  26.    channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  27.      falsetrue);
  28.    System.err.println("get msg2 failed msg = "+msg);
  29.   }
  30.  }
  31. }

两个消费者之间唯一的区别在于两者获取消息后,延迟时间不一致。

2.5、消息生产者

有了消息消费者,我们需要有一个方式提供消息并将消息推送到消息队列中。

  1. public interface IMessageServcie {
  2.  public void sendMessage(String exchange,String routingKey,Object msg);
  3. }
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.connection.CorrelationData;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
  10. import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import cn.linkpower.service.IMessageServcie;
  14. @Component
  15. public class MessageServiceImpl implements IMessageServcie,ConfirmCallback,ReturnCallback {
  16.  
  17.  private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
  18.  
  19.  @Autowired
  20.  private RabbitTemplate rabbitTemplate;
  21.  
  22.  @Override
  23.  public void sendMessage(String exchange,String routingKey,Object msg) {
  24.   //消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  25.   rabbitTemplate.setMandatory(true);
  26.   //消息消费者确认收到消息后,手动ack回执
  27.   rabbitTemplate.setConfirmCallback(this);
  28.   rabbitTemplate.setReturnCallback(this);
  29.   //发送消息
  30.   rabbitTemplate.convertAndSend(exchange,routingKey,msg);
  31.  }
  32.  @Override
  33.  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  34.   log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
  35.  }
  36.  
  37.  @Override
  38.  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  39.   log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
  40.   log.info("correlationData -->"+correlationData.toString());
  41.   if(ack){
  42.    log.info("---- confirm ----ack==true  cause="+cause);
  43.   }else{
  44.    log.info("---- confirm ----ack==false  cause="+cause);
  45.   }
  46.  }
  47. }

除了定义好了消息发送的工具服务接口外,我们还需要一个类,实现请求时产生消息,所以我们写一个controller。

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.stereotype.Controller;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.ResponseBody;
  5. import cn.linkpower.service.IMessageServcie;
  6. @Controller
  7. public class SendMessageTx {
  8.  
  9.  @Autowired
  10.  private IMessageServcie messageServiceImpl;
  11.  
  12.  @RequestMapping("/sendMoreMsgTx")
  13.  @ResponseBody
  14.  public String sendMoreMsgTx(){
  15.   //发送10条消息
  16.   for (int i = 0; i < 10; i++) {
  17.    String msg = "msg"+i;
  18.    System.out.println("发送消息  msg:"+msg);
  19.    messageServiceImpl.sendMessage("directExchangeTx""directQueueTxRoutingKey", msg);
  20.    //每两秒发送一次
  21.    try {
  22.     Thread.sleep(2000);
  23.    } catch (InterruptedException e) {
  24.     e.printStackTrace();
  25.    }
  26.   }
  27.   return "send ok";
  28.  }
  29. }

运行springboot项目,访问指定的url,是可以观察到消息产生和消费的。

有些人会问,写到这里就够了吗,你这和之前博客相比,和没写一样啊,都是教我们如何配置,如何生产消息,如何消费消息。

所以接下来的才是重点了,我们一起研究一个事,当我们配置的消费者二出现消费消息时,出问题了,你如何能够保证像之前那样,消费者一处理剩下的消息?

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://gitee.com/zhijiantianya/yudao-cloud

  • 视频教程:https://doc.iocoder.cn/video/

三、ack配置和测试

3.1、模拟消费者二出问题

我们发送的消息格式都是 msg1、msg2、…

所以,我们不妨这么想,当我消费者二拿到的消息msg后面的数字大于3,表示我不要了。

  1. import java.io.IOException;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import com.rabbitmq.client.Channel;
  7. @Component
  8. @RabbitListener(queues="directQueueTx")
  9. public class Consumer2 {
  10.  @RabbitHandler
  11.  public void process(String msg,Channel channel, Message message) throws IOException {
  12.   //拿到消息延迟消费
  13.   try {
  14.    Thread.sleep(1000*3);
  15.   } catch (InterruptedException e) {
  16.    e.printStackTrace();
  17.   }
  18.   
  19.   
  20.   
  21.   try {
  22.    if(!isNull(msg)){
  23.     String numstr = msg.substring(3);
  24.     Integer num = Integer.parseInt(numstr);
  25.     if(num >= 3){
  26.      channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  27.        falsetrue);
  28.      System.out.println("get msg2 basicNack msg = "+msg);
  29.     }else{
  30.      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  31.      System.out.println("get msg2 basicAck msg = "+msg);
  32.     }
  33.    }
  34.   } catch (Exception e) {
  35.    //消费者处理出了问题,需要告诉队列信息消费失败
  36.    channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  37.      falsetrue);
  38.    System.err.println("get msg2 failed msg = "+msg);
  39.   }
  40.  }
  41.  
  42.  public static boolean isNull(Object obj){
  43.   return obj == null || obj == ""||obj == "null";
  44.  }
  45. }

再次请求接口,我们统计日志信息打印发现:

26346d4dec2604166905110d6496f09b.png

发现:

当我们对消息者二进行限制大于等于3时,不接受消息队列传递来的消息时,消息队列会随机重发那条消息,直至消息发送至完好的消费者一时,才会把消息消费掉。

四、分析几个回执方法

4.1、确认消息

channel.basicAck(long deliveryTag, boolean multiple);
7f27b24ba705ea1d26f259151e7f2743.png

我们一般使用下列方式:

  1. channel.basicAck(
  2. message.getMessageProperties().getDeliveryTag(), 
  3. false);

4.2、拒绝消息

channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) ;
0680698fadf9f0ce0cad5cfce4da98d0.png

我们接下来还是修改消费者二,将这个方法最后个参数更改为false,看现象是什么?

  1. import java.io.IOException;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import com.rabbitmq.client.Channel;
  7. @Component
  8. @RabbitListener(queues="directQueueTx")
  9. public class Consumer2 {
  10.  @RabbitHandler
  11.  public void process(String msg,Channel channel, Message message) throws IOException {
  12.   //拿到消息延迟消费
  13.   try {
  14.    Thread.sleep(1000*3);
  15.   } catch (InterruptedException e) {
  16.    e.printStackTrace();
  17.   }
  18.   
  19.   
  20.   
  21.   try {
  22.    if(!isNull(msg)){
  23.     String numstr = msg.substring(3);
  24.     Integer num = Integer.parseInt(numstr);
  25.     if(num >= 3){
  26.      channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  27.        falsefalse);
  28.      System.out.println("get msg2 basicNack msg = "+msg);
  29.     }else{
  30.      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  31.      System.out.println("get msg2 basicAck msg = "+msg);
  32.     }
  33.    }
  34.   } catch (Exception e) {
  35.    //消费者处理出了问题,需要告诉队列信息消费失败
  36.    channel.basicNack(message.getMessageProperties().getDeliveryTag(),
  37.      falsetrue);
  38.    System.err.println("get msg2 failed msg = "+msg);
  39.   }
  40.  }
  41.  
  42.  public static boolean isNull(Object obj){
  43.   return obj == null || obj == ""||obj == "null";
  44.  }
  45. }
d4abaf13d86f431221131eeacdf8760d.png

重启项目,重新请求测试接口。

51b9e83f373ba5f060de1f65b80d72e5.png

发现,当出现设置参数为false时,也就是如下所示的设置时:

  1. channel.basicNack(
  2.  message.getMessageProperties().getDeliveryTag(),
  3.  false
  4.  false);

如果此时消费者二出了问题,这条消息不会重新回归队列中重新发送,会丢失这条数据。

并且再消息队列中不会保存:

3763b1b3fe1531299ff1edad7e6436eb.png

4.3、拒绝消息

channel.basicReject(long deliveryTag, boolean requeue);
55e80a6893ba950a0b638cda36f5a18d.png

这个和上面的channel.basicNack又有什么不同呢?我们还是修改消费者二实验下。

cfa45f6f632d54be7b38bc1f8316c178.png

请求测试接口,查看日志信息。

be523112ce16e953c0a5f8124a4eb793.png

发现,此时的日志信息配置

  1. channel.basicReject(
  2. message.getMessageProperties().getDeliveryTag(),
  3.  true);

  1. channel.basicNack(
  2. message.getMessageProperties().getDeliveryTag(),
  3. falsetrue);

实现的效果是一样的,都是将信息拒绝接收,由于设置的requeue为true,所以都会将拒绝的消息重新入队列中,重新进行消息分配并消费。

五、总结

这一篇博客,我们总结了相关的配置,三个确认(或回执)信息的方法,并区别了他们的各项属性,也知道了当消息再一个消费者中处理失败了,如何不丢失消息重新进行消息的分配消费问题。

但是这个只是队列和消费者之间的消息确认机制,使用手动ACK方式确保消息队列中的消息都能在消费者中成功消费。那么,消息转发器和消息队列之间呢?消息生产者和消息转发器之间呢?

当然,差点忘了一个小问题。

我们思考一个问题,如果消息队列对应的消费者只有一个,并且那个消费者炸了,会出现什么问题呢??



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

1e485801776bfbf81aaea444615a95f1.png

已在知识星球更新源码解析如下:

7e053f9d600a6451f02a2dde7c45c200.jpeg

ded43006fe22542cc400daa3cb9f558b.jpeg

5ee3bfb94b45f619e2873669135263c4.jpeg

3bf34df3d8425ef3db4987bc41aaed73.jpeg

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

  1. 文章有帮助的话,在看,转发吧。
  2. 谢谢支持哟 (*^__^*)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/127122
推荐阅读
相关标签
  

闽ICP备14008679号