赞
踩
先来回顾一下前文,我们先基于Java原生语言,利用多线程和锁实现了串行/并行任务(Java串行/并行任务实现);之后利用SpringBoot为我们封装好的功能,尝试用SpringBoot自带的API实现了异步调用,并在此基础上,统一管理了多线程的事务(SpringBoot异步任务及并行事务实现)。
对于多线程的使用,我们已经有了一个全面的认知,系统响应能力也确实有了不小的提升。但随着系统负载持续增大,如果每个用户请求进来都为其分配线程,线程池打满后用户也只能一直等待;计算量过大、线程数过多时,CPU也会承受不了压力。线程是针对单进程的概念,天生不支持分布式,如果某个下游服务需要处理许多上游服务发送的请求,此时多线程就不一定能提升效率了——因为既要兼顾大量计算,又要快速在线程间进行切换,单机负载会影响整个链路的响应速度。
针对这种情况最好的解决方案,就是引入消息队列中间件。不过要注意,不能说感觉有用就用了,引入新中间件付出的代价也是极大的。需要整体评估系统的复杂性和稳定性,以及功能是否有必要解耦。比如说只是一个响应速度很快的插库,放入消息队列还需要耗费一个网络通信的时间,此时就完全没有必要;或是该操作必须放在主线程中同步,下面的操作都要依据该操作结果来判断,例如我要获取支付系统当前能否正常响应,如能正常响应再进行支付,这时哪怕这个动作再慢,你也得等着。
使用MQ的最佳场景:流量削峰、异步解耦,本篇我们仅就异步解耦和RabbitMQ的一些特性介绍,其他更系统化的应用以后再详细说。
整体基于前几篇文章介绍的场景,用户下单成功后一直未支付,系统就会做如下几件事:
超时订单取消是基于DelayQueue做的,这里不做详细介绍。更简单点说,可以理解为下单成功后就发短信通知用户“下单成功了”,发送短信一般依赖第三方服务,是一个较为耗时的操作,但又不严格属于整个下单流程内,因此是可以解耦出来的。
那么现在下单的整体流程就为:
用户下单 -> 延时队列监测超时订单 -> 超时订单处理逻辑 -> MQ消费者异步发送邮件
(短信要钱,邮件免费,所以这里用邮件代替一下 )
生产者的逻辑很简单,就是一行convertAndSend()指定交换机、路由键和发送的实体类,还有个小细节,如果MQ接收的Body为实体类,消息转换器要使用“Jackson2JsonMessageConverter”,这样消息的Content-Type就被指定为Json格式了,否则会无法正常序列化。
- //发送邮件队列信息
- private static final String eMailExchange = "my.order";
- private static final String eMailQueue = "order.email";
- private static final String eMailRoutingKey = "order.email";
-
- //实际业务逻辑就一行
- MailUtils.sendMail(eMail.getAddress(), eMail.getSubject(), eMail.getContent(), true);
我们按照上面的信息创建好交换机,将队列绑定到指定的交换机上,具体怎么绑就不赘述了,不管是去RabbitMQ Management手动创建,还是用Java Bean形式创建都可以。
消费者的逻辑很简单,监听消费指定队列、拿到入参的EMail实体类、根据实体类属性发送邮件。但这也太简单了,那就顺便使用一下RabbitMQ自带的功能——手动/自动Ack。
Ack为"Acknowledge Character",意思是确认字符,源于网络通信的概念。RabbitMQ中包含"Ack"和"Nack",用于告知MQ该条消息正常消费/消费异常,有几个参数需要注意。
- //Ack
- channel.basicAck(deliveryTag, false); //仅确认该条消息
- channel.basicAck(deliveryTag, true); //确认所有已完成消息
-
- //Nack
- channel.basicNack(deliveryTag, false, false); //消费失败,丢弃消息
- channel.basicNack(deliveryTag, false, true); //消费失败,放入队列重新消费
RabbitMQ默认Ack模式是"Auto",也就是会自动Ack,这是为了防止用户没有手动Ack导致消费消息一直积压在Unacked队列中,导致MQ服务OOM而死。但实际上手动Ack是比较合理的选择,一是能够提高MQ的响应能力,我消费完了立马告知MQ,可以处理下一条了;二是更加安全,Auto模式下即使消息消费异常,还是会自动Ack,这条消息就无影无踪了。
最后一点,也是最重要的一点,消息正在消费的时候MQ服务挂了,如果是Auto模式,这条消息会直接丢失,因为消费者在获取到这条消息时就会自动Ack;但如果是Manual模式,一切都改变了,由于Broker没有接收到你的Ack/Nack,消息会处于Unacked状态,在下次服务恢复正常时会重新进行消费,振奋人心!
因此我们把MQ设置为手动Ack模式"Manual",在消费方法中trycatch,正常消费就Ack,发现异常就Nack并把消息丢弃。响应能力确实提高了,消息确认也更灵活了,但是看起来怪怪的——不论是消费成功还是失败,消息最终都被丢弃了。有人会说异常就Nack消息放回队列重新消费,但如果这条消息一直消费失败,这条消息会被无限消费,这是十分可怕的。可以实验一下,设置"prefetch = 1",消费者每次只能获取一条消息进行消费,有一条消息异常重入队列后,这个消费者就永远卡在这了。
但我们还是想多给消费者几次机会,起码试几次再让他丢掉嘛,RabbitMQ提供了一种优雅地重试方式“Retry机制”。
只需要修改一下配置文件,就能开启RabbitMQ的本地重试机制,之所以称他为“本地重试”是由于消费者是将该消息在本机重试,不与MQ服务交互。
- retry:
- enabled: true
- max-attempts: 3 #重试次数
- initial-interval: 3000 #间隔时间, ms
- max-interval: 5000 #重试最大间隔时间, ms
- multiplier: 1 #负载因子, 重试间隔时间倍数, 默认1
这样就开启了重试功能,只要消费者抛出异常就会以指定间隔时间、重试指定次数,记得不要catch住异常哦,哪怕catch了也要再抛出去。
但是默认的Retry机制并不是完美的,他有几个很明显的缺陷:
第二个问题比较好解决,可以给队列绑定一个死信队列,指定死信交换机和路由键,如下这两个属性。 在重试到达上限后,会放入指定的死信队列,可以由监听死信队列的消费者进行后续补偿处理。
x-dead-letter-exchange: email.dead x-dead-letter-routing-key: email.dead
或者是自定义"MessageRecoverer"并注入,默认的消息恢复器是"RejectAndDontRequeueRecoverer",意为拒绝且不重入队列,在到达上限后会报错告诉你重试次数耗尽然后丢掉消息。这个显然不是很好用,我们可以用"RepublishMessageRecoverer",将异常消息重发至死信队列。
但是使用Retry就注定和手动Ack是无缘了,我们需要设计一个两全的方案,既保留手动Ack的安全性,又兼顾Retry机制的稳定性——那就来手动实现一下Retry吧。
实现思路是使用Redis标记该消息的重试次数,在未达到重试上限前,使用Nack将消息重入队列;达到重试上限后,将消息Nack自动发送至死信队列。使用该方法的重点,就是要给每条消息携带一个唯一ID,可以使用UUID或是Snowflake。直接上代码。
- //利用Redis手动实现重试机制
- private void retryExecute(Channel channel, Message message, EMail eMail, Map<String, Object> headers) throws IOException {
- MessageProperties messageProperties = message.getMessageProperties();
- String redisKey = RETRY_EXECUTE_TIMES_KEY.concat(":").concat(eMail.getMessageId());
- Object value = RedisUtil.get(redisKey);
- if (Objects.isNull(value)){
- //当前为第一次执行,返回重试
- RedisUtil.set(redisKey, 2, 60 * 5);
- channel.basicNack(messageProperties.getDeliveryTag(), false, true);
- } else {
- Integer integer = Integer.parseInt(value.toString());
- if (integer < RETRY_EXECUTE_TIMES_MAX) {
- //当前为第二次执行,返回重试
- RedisUtil.set(redisKey, integer + 1, 60 * 5);
- channel.basicNack(messageProperties.getDeliveryTag(), false, true);
- } else {
- log.error("3次了,不试了,扔死信队列了");
- channel.basicNack(messageProperties.getDeliveryTag(), false, false);
- }
- }
-
- }
我们给每条记录5分钟的超时时间,足够消费者进行重试了。 都整好了我们来试验一下,运行一下看看日志。
- 2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
- 2023-02-24 16:32:29.122 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
- 2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
- 2023-02-24 16:32:29.931 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
- 2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 该报错了哈
- 2023-02-24 16:32:29.935 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 已经报错了哈
- 2023-02-24 16:32:29.936 ERROR 21460 --- [ntContainer#0-1] com.consumer.service.impl.EMailConsumer : 3次了,不试了,扔死信队列了
结果符合我们的预期,消息也确实从 order.email 移入了 email.dead 死信队列中。这里有这么多条是因为我之前光放没消费,不用太在意。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。