赞
踩
消息队列RocketMQ入门实践(一)
消息队列RocketMQ入门实践(二)
消息队列RocketMQ入门实践–关键特性(三)
消息队列RocketMQ入门实践–关键特性(四)
消息队列RocketMQ入门实践–消息存储(五)
在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试需要分2种,分别是producer端重试和consumer端重试。
今天这篇文章就来聊一聊RocketMQ是如何进行消息重试策略的。
生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
Producer 的 send 方法本身支持内部重试,重试逻辑如下:
所以,如果本身往 broker 发送消息产生超时异常,就不会再做重试。
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用返样做:如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker。
上述 db 重试方式为什么没有集成到 MQ 客户端内部做,而是要求应用自己去完成,是基于以下几点考虑:
- MQ 的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是 cpu、内存、网络。
- 如果 MQ 客户端内部集成一个 KV 存储模块,那举数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受 MQ 运维人员控制,可能经常会发生 kill -9 返样暴力方式关闭,造成数据没有及时落盘而丢失。
- Producer 所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。
综上,建议重试过程交由应用来控制。
消费者端的失败,分为2种情况,一种是exception,一种是timeout。
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
消息的状态:
public enum ConsumeConcurrentlyStatus {
CONSUME_SUCCESS,
RECONSUME_LATER;
}
可以看到,消息的状态分为成功或者失败。如果返回的状态为失败会怎么样呢?
在启动broker的日志中可以看到这样的信息:
这个表示了,如果消息消费失败,那么消息将会在1s、5s、10s后重试,一直到2h后不再重试。
其实,有些时候并不需要重试这么多次,一般重试3~5次即可。这个时候就可以通过msg.getReconsumeTimes()获取重试次数进行控制。
代码如下(示例):
public class ConsumerDemo { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiliu_consumer_group"); consumer.setNamesrvAddr("42.194.222.32:9876"); // 订阅topic,接收此Topic下的所有消息 consumer.subscribe("my-test-topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } System.out.println("收到消息->" + msgs); if(msgs.get(0).getReconsumeTimes() >= 3){ // 重试3次后,不再进行重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); } }
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!
也就是说,服务端没有接收到消息的反馈,既不是成功也不是失败,这个时候定义为超时。
造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
如《RocketMQ 原理简介》中所述,RocketMQ 无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务局面去重,有以下几种去重方式
第1条解决方案很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。很明显应该在消费端实现,不属于消息系统要实现的功能
第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。这种方案可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
感谢大家的阅读,以上就是全部内容了。本文讲述了producer端重试和consumer端重试的机制,以及重复消费的解决方案。不足之处,还望多多批评指正。
若觉得本文对您有帮助的话,帮忙点赞评论关注,支持一波哟~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。