赞
踩
生产者发送消息失败会重试,可以通过参数来设置:
创建producer实例设置参数:
- // 失败的情况重发3次
- producer.setRetryTimesWhenSendFailed(3);
- // 消息在1S内没有发送成功,就会重试
- producer.send(msg, 1000);
application配置文件:
- rocketmq:
- name-server: 10.3.22.103:9876;10.3.22.115:9876;10.3.22.121:9876
- # Producer 生产者
- producer:
- # 异步消息重试此处,默认2
- retry-times-when-send-async-failed: 2
- # 发送消息失败重试次数,默认2
- retry-times-when-send-failed: 2
注意:
抛出特定的异常才会重试,异常的类型仅包括以下几种,system busy和broker busy这两个错误码不会重试:
消息消费要满足订阅关系一致性,即一个consumerGroup中的所有消费者订阅的topic和tag必须保持一致,不然就会造成消息丢失。
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试只针对集群(MessageModel.CLUSTERING)消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
对于无序消息(普通、延时、事务消息),消息队列 RocketMQ 默认允许每条消息最多重试 16 次,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。每次重试的间隔时间如下:
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
可以通过以下两种方式修改重试次数:
通过设置返回状态达到消息重试的结果,返回枚举类ConsumeConcurrentlyStatus的两个状态,来设置是否需要重试:
- public class AbstractRocketMQListener implements MessageListenerConcurrently {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- // 业务逻辑
- doConsume();
- //消费成功
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }catch (Exception e){
- //消费失败,重试
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
-
- }
- }
还有另外一种方式,继承RocketMQListener类,增加注解RocketMQMessageListener,通过设置消息的参数来监听消息,maxReconsumeTimes默认为16次,可以通过设置,修改重试的次数。
- @Component
- @RocketMQMessageListener(
- topic = "ORDER_RECEIVE_TOPIC",
- consumerGroup = "CONSUMER_ORDER_RECEIVE_GROUP",
- // 指定消费者线程数,默认20,生产中请注意配置,避免过大或者过小
- consumeThreadNumber = 60,
- maxReconsumeTimes = 5
- )
- public class AbstractRocketMQListener implements RocketMQListener<MessageExt> {
- @Override
- public void onMessage(MessageExt message) {
- try {
- //业务逻辑
- doConsume();
- return;
- } catch (CommonMsgException ce) {
- //业务异常,不需要重试,直接退出
- } catch (Exception e) {
- //异常处理
- //抛出异常重试
- throw new RuntimeException(e);
- }
-
- }
- }
注意:
如果业务异常不需要重试的,比如参数有误、校验不通过,重试没有用。可以定义业务异常类CommonMsgException,直接退出。
并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。
对于死信的处理方案有多种,有两种方案:
消息监听和普通的消息监听一样。就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。
- @Component
- @RocketMQMessageListener(
- topic = "ORDER_RECEIVE_TOPIC",
- consumerGroup = "CONSUMER_ORDER_RECEIVE_GROUP",
- // 指定消费者线程数,默认20,生产中请注意配置,避免过大或者过小
- consumeThreadNumber = 60
- )
- public class AbstractRocketMQListener implements RocketMQListener<MessageExt> {
- @Override
- public void onMessage(MessageExt message) {
- try {
- //业务逻辑
- doConsume();
- return;
- } catch (CommonMsgException ce) {
- //业务异常,不需要重试,直接退出
- } catch (Exception e) {
- // 如果重试次数超过2次就不需要重试了
- if (message.getReconsumeTimes() >= 2) {
- //不要重试了
- } else {
- //重试
- throw new RuntimeException(e);
- }
- }
-
- }
- }
@RocketMQMessageListener 设置了MaxreconsumeTimes=20,但还是MessageExt中reconsumeTimes为什么有时候还是0,有以下几个原因:
消息尚未被消费失败:reconsumeTimes
只会在消息消费失败时增加。如果消息首次被成功消费(即没有抛出异常),则 reconsumeTimes
的值将保持为 0。
消息被其他消费者实例成功消费:在消费者组(Consumer Group)中,可能有多个消费者实例同时运行。如果一个实例消费失败,而另一个实例成功消费了这条消息,那么 reconsumeTimes
不会增加。
配置未生效:请确保您的 maxReconsumeTimes
配置已经正确设置并且已经生效。检查消费者配置是否正确,以及是否重新启动了消费者以使配置生效。
消息未到达重试阶段:如果消息在首次尝试时就被成功消费,那么它不会进入重试阶段。只有当消息首次消费失败时,才会开始重试,并增加 reconsumeTimes
。
消息被其他消费者组消费:请确保没有其他消费者组也订阅了相同的主题(Topic)和标签(Tag),并且可能在这些消费者组中成功消费了消息。
如果消息失败过,重试次数加1。如果之前消息消费成功过,不会重试。实例断开,即使服务器重启,重试次数累加的。messageId不变
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。