赞
踩
rocketmq生产者发送消息失败默认重试2次(同步发送为2次,异步发送为0次)。
当然也可以自定义重试次数及机制:
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
consumer.setMaxReconsumeTimes(5);
在实际生产中,一般重试3-5次,如果还没有消费成功,则可以把消息签收,通知人工介入。
consumer.setSuspendCurrentQueueTimeMillis(5000);
示例:
消费者重试示例:
@Test public void retryConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group"); consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); consumer.subscribe("retryTopic", "*"); //设置最大重试次数 consumer.setMaxReconsumeTimes(2); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes()); //业务报错了 返回null 返回RECONSUME_LATER 都会重试 if(true){ throw new RuntimeException(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个 特殊Topic
,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下:
消费类型 | 重试间隔 | 最大重试次数 |
---|---|---|
顺序消费 | 间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX |
并发消费 | 间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置 | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。(根据延时等级划分,共16次) |
上面提到的 特殊Topic
称为死信Topic,对应的队列就是死信队列,其中存储的消息就是死信消息。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。应该通知人工介入处理。
对于死信的处理方案有多种,这里演示两种:
方案一:
@Test public void deadConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group"); consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); consumer.subscribe("%DLQ%retry-consumer-group", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes()); System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理"); //业务报错了 返回null 返回RECONSUME_LATER 都会重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
该方案有一个缺点,就是如果很多Topic都产生了死信消息,那么我们想要处理这些死信消息就得编写很多个监听各个死信队列的消费者。
方案二:
针对方案一的缺点,方案二能够比较好的解决。
@Test public void retryConsumer2() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group"); consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); consumer.subscribe("retryTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); System.out.println(new Date()); try { handleDb();// 10/0 } catch (Exception e) { if (messageExt.getReconsumeTimes() >= 2) { //不要重试了 System.out.println("消息体:" + new String(messageExt.getBody())); System.out.println("记录到特别的位置如mysql,发送邮件或短信通知人工处理"); } else { //重试 System.out.println("时间:" + new Date() + "\t消息体:" + new String(messageExt.getBody()) + "\t重试次数:" + messageExt.getReconsumeTimes()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); } private void handleDb() { int i = 10 / 0; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。