当前位置:   article > 正文

RabbitMQ异步与重试机制_rabbitmq 重试机制测试

rabbitmq 重试机制测试

        先来回顾一下前文,我们先基于Java原生语言,利用多线程和锁实现了串行/并行任务(Java串行/并行任务实现);之后利用SpringBoot为我们封装好的功能,尝试用SpringBoot自带的API实现了异步调用,并在此基础上,统一管理了多线程的事务(SpringBoot异步任务及并行事务实现)。

        对于多线程的使用,我们已经有了一个全面的认知,系统响应能力也确实有了不小的提升。但随着系统负载持续增大,如果每个用户请求进来都为其分配线程,线程池打满后用户也只能一直等待;计算量过大、线程数过多时,CPU也会承受不了压力。线程是针对单进程的概念,天生不支持分布式,如果某个下游服务需要处理许多上游服务发送的请求,此时多线程就不一定能提升效率了——因为既要兼顾大量计算,又要快速在线程间进行切换,单机负载会影响整个链路的响应速度。

        针对这种情况最好的解决方案,就是引入消息队列中间件。不过要注意,不能说感觉有用就用了,引入新中间件付出的代价也是极大的。需要整体评估系统的复杂性和稳定性,以及功能是否有必要解耦。比如说只是一个响应速度很快的插库,放入消息队列还需要耗费一个网络通信的时间,此时就完全没有必要;或是该操作必须放在主线程中同步,下面的操作都要依据该操作结果来判断,例如我要获取支付系统当前能否正常响应,如能正常响应再进行支付,这时哪怕这个动作再慢,你也得等着。

        使用MQ的最佳场景:流量削峰、异步解耦,本篇我们仅就异步解耦和RabbitMQ的一些特性介绍,其他更系统化的应用以后再详细说。

1 场景介绍

        整体基于前几篇文章介绍的场景,用户下单成功后一直未支付,系统就会做如下几件事:

  1. 超时订单自动取消、更新订单状态
  2. 归还原有库存
  3. 短信通知用户

        超时订单取消是基于DelayQueue做的,这里不做详细介绍。更简单点说,可以理解为下单成功后就发短信通知用户“下单成功了”,发送短信一般依赖第三方服务,是一个较为耗时的操作,但又不严格属于整个下单流程内,因此是可以解耦出来的。

        那么现在下单的整体流程就为:

用户下单 -> 延时队列监测超时订单 -> 超时订单处理逻辑 -> MQ消费者异步发送邮件

        (短信要钱,邮件免费,所以这里用邮件代替一下 )

2 业务逻辑编写

        生产者的逻辑很简单,就是一行convertAndSend()指定交换机、路由键和发送的实体类,还有个小细节,如果MQ接收的Body为实体类,消息转换器要使用“Jackson2JsonMessageConverter”,这样消息的Content-Type就被指定为Json格式了,否则会无法正常序列化。

  1. //发送邮件队列信息
  2. private static final String eMailExchange = "my.order";
  3. private static final String eMailQueue = "order.email";
  4. private static final String eMailRoutingKey = "order.email";
  5. //实际业务逻辑就一行
  6. MailUtils.sendMail(eMail.getAddress(), eMail.getSubject(), eMail.getContent(), true);

        我们按照上面的信息创建好交换机,将队列绑定到指定的交换机上,具体怎么绑就不赘述了,不管是去RabbitMQ Management手动创建,还是用Java Bean形式创建都可以。

        消费者的逻辑很简单,监听消费指定队列、拿到入参的EMail实体类、根据实体类属性发送邮件。但这也太简单了,那就顺便使用一下RabbitMQ自带的功能——手动/自动Ack。

3 手动/自动 Ack/Nack

        Ack为"Acknowledge Character",意思是确认字符,源于网络通信的概念。RabbitMQ中包含"Ack"和"Nack",用于告知MQ该条消息正常消费/消费异常,有几个参数需要注意。

  1. //Ack
  2. channel.basicAck(deliveryTag, false); //仅确认该条消息
  3. channel.basicAck(deliveryTag, true); //确认所有已完成消息
  4. //Nack
  5. channel.basicNack(deliveryTag, false, false); //消费失败,丢弃消息
  6. channel.basicNack(deliveryTag, false, true); //消费失败,放入队列重新消费
  1. 第一个参数为每条消息的唯一标识deliveryTag,用于确认指定消息。
  2. 第二个参数为单条确认或批量确认,传入true即为确认所有已消费消息,传入false为仅确认该条消息;一般用false,自己确认自己的就行,批量确认能节省一点网络开销,但没必要。
  3. 第三个参数只有Nack才有,意为是否重新消费,传入true则重新放入队列头部,再次进行消费;传入false则直接丢弃。但是需要注意的是,重入队列会放在队列头部,等于会立即进行重新消费,如果该消息一直报错,就会阻塞该队列。

        RabbitMQ默认Ack模式是"Auto",也就是会自动Ack,这是为了防止用户没有手动Ack导致消费消息一直积压在Unacked队列中,导致MQ服务OOM而死。但实际上手动Ack是比较合理的选择,一是能够提高MQ的响应能力,我消费完了立马告知MQ,可以处理下一条了;二是更加安全,Auto模式下即使消息消费异常,还是会自动Ack,这条消息就无影无踪了。

        最后一点,也是最重要的一点,消息正在消费的时候MQ服务挂了,如果是Auto模式,这条消息会直接丢失,因为消费者在获取到这条消息时就会自动Ack;但如果是Manual模式,一切都改变了,由于Broker没有接收到你的Ack/Nack,消息会处于Unacked状态,在下次服务恢复正常时会重新进行消费,振奋人心!

        因此我们把MQ设置为手动Ack模式"Manual",在消费方法中trycatch,正常消费就Ack,发现异常就Nack并把消息丢弃。响应能力确实提高了,消息确认也更灵活了,但是看起来怪怪的——不论是消费成功还是失败,消息最终都被丢弃了。有人会说异常就Nack消息放回队列重新消费,但如果这条消息一直消费失败,这条消息会被无限消费,这是十分可怕的。可以实验一下,设置"prefetch = 1",消费者每次只能获取一条消息进行消费,有一条消息异常重入队列后,这个消费者就永远卡在这了。

        但我们还是想多给消费者几次机会,起码试几次再让他丢掉嘛,RabbitMQ提供了一种优雅地重试方式“Retry机制”。

4 Retry

        只需要修改一下配置文件,就能开启RabbitMQ的本地重试机制,之所以称他为“本地重试”是由于消费者是将该消息在本机重试,不与MQ服务交互。

  1. retry:
  2. enabled: true
  3. max-attempts: 3 #重试次数
  4. initial-interval: 3000 #间隔时间, ms
  5. max-interval: 5000 #重试最大间隔时间, ms
  6. multiplier: 1 #负载因子, 重试间隔时间倍数, 默认1

        这样就开启了重试功能,只要消费者抛出异常就会以指定间隔时间、重试指定次数,记得不要catch住异常哦,哪怕catch了也要再抛出去。

        但是默认的Retry机制并不是完美的,他有几个很明显的缺陷:

  • 由于无法try catch异常,也就无法使用手动Ack模式。换句话说,Retry和手动Ack是一对互斥的选项。
  • 重试到达上限次数后,也会将消息丢弃,默认不会有特殊的处理机制。

        第二个问题比较好解决,可以给队列绑定一个死信队列,指定死信交换机和路由键,如下这两个属性。 在重试到达上限后,会放入指定的死信队列,可以由监听死信队列的消费者进行后续补偿处理。

x-dead-letter-exchange:email.dead
x-dead-letter-routing-key:email.dead

        或者是自定义"MessageRecoverer"并注入,默认的消息恢复器是"RejectAndDontRequeueRecoverer",意为拒绝且不重入队列,在到达上限后会报错告诉你重试次数耗尽然后丢掉消息。这个显然不是很好用,我们可以用"RepublishMessageRecoverer",将异常消息重发至死信队列。

        但是使用Retry就注定和手动Ack是无缘了,我们需要设计一个两全的方案,既保留手动Ack的安全性,又兼顾Retry机制的稳定性——那就来手动实现一下Retry吧。

5 手动实现Retry

        实现思路是使用Redis标记该消息的重试次数,在未达到重试上限前,使用Nack将消息重入队列;达到重试上限后,将消息Nack自动发送至死信队列。使用该方法的重点,就是要给每条消息携带一个唯一ID,可以使用UUID或是Snowflake。直接上代码。

  1. //利用Redis手动实现重试机制
  2. private void retryExecute(Channel channel, Message message, EMail eMail, Map<String, Object> headers) throws IOException {
  3. MessageProperties messageProperties = message.getMessageProperties();
  4. String redisKey = RETRY_EXECUTE_TIMES_KEY.concat(":").concat(eMail.getMessageId());
  5. Object value = RedisUtil.get(redisKey);
  6. if (Objects.isNull(value)){
  7. //当前为第一次执行,返回重试
  8. RedisUtil.set(redisKey, 2, 60 * 5);
  9. channel.basicNack(messageProperties.getDeliveryTag(), false, true);
  10. } else {
  11. Integer integer = Integer.parseInt(value.toString());
  12. if (integer < RETRY_EXECUTE_TIMES_MAX) {
  13. //当前为第二次执行,返回重试
  14. RedisUtil.set(redisKey, integer + 1, 60 * 5);
  15. channel.basicNack(messageProperties.getDeliveryTag(), false, true);
  16. } else {
  17. log.error("3次了,不试了,扔死信队列了");
  18. channel.basicNack(messageProperties.getDeliveryTag(), false, false);
  19. }
  20. }
  21. }

        我们给每条记录5分钟的超时时间,足够消费者进行重试了。 都整好了我们来试验一下,运行一下看看日志。

  1. 2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
  2. 2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
  3. 2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
  4. 2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
  5. 2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
  6. 2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
  7. 2023-02-24 16:32:29.936 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 3次了,不试了,扔死信队列了

        结果符合我们的预期,消息也确实从 order.email 移入了 email.dead 死信队列中。这里有这么多条是因为我之前光放没消费,不用太在意。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/603960
推荐阅读
相关标签
  

闽ICP备14008679号