赞
踩
目录
消费者中我们注册了一个监听器回调函数,当Consumer获取消息后,就会交给我们的回调函数来进行处理。如果处理完了,就返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS,提交这批消息的offset到broker去,然后继续从broker获取下一批消息来进行处理。 如果上面代码回调函数中,对一批消息处理的时候,数据库宕机了就不能再能返回CONSUME_SUCCESS,如果你返回的话,下一次就会处理下一批消息,但是这批消息其实没处理成功,此时必然导致这批消息就丢失了;
返回RECONSUME_LATER
- consumer.registerMessageListener(
- new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(
- List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- try {
- // 后续消息处理逻辑
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } catch (Exception e) {
- // 比如因为数据库宕机了,没法对消息完成处理
- // 此时可以返回一个稍后重试的消费状态
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
- }
- }
- );
如果消费端消费失败的话,过段时间再次给我这批消息重试;
只有当消费模式为集群模式时,Broker 才会自动进行重试,对于广播消息是不会重试的;
RocketMQ会有一个针对你这个ConsumerGroup的重试队列,如果你返回了RECONSUME_LATER状态,他就会把你这批消息放到你这个消费组的重试队列中去,比如消费组是"WMSConsumerGroup",那么就会有一个“%RETRY%WMSConsumerGroup”,这个名字的重试队列;
然后过一段时间,重试队列中的消息会再次进行处理,如果再次失败,又返回了RECONSUME_LATER,那么会再过一段时间让我们再次进行处理,默认最多重试16次;每次重试之间的间隔时间是不一样的,这个间隔时间可以如下进行配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
意思就是,第一次重试是1秒后,第二次重试是5秒后,第三次重试是10秒后,第四次重试是30秒后,第五次重试是1分钟后,以此类推,最多重试16次;
刚才我们说如果你返回了RECONSUME_LATER,消息就会进入重试队列,其实不完全准确;
当MQ接收到RECONSUME_LATER后,首先会完成消息的转换,把消息存到延时队列中,然后再根据消息的延时时间保存到重试队列中;
如果重试了16次之后依然无法处理,就会把这些消费放入死信队列。死信队列中的消息RocketMQ不会再做处理,这部分数据要怎么处理就要看我们的业务场景了,我们可以做一个后台线程去订阅这个死信队列,完成后续消息的处理;
如果在16次重试范围内消息处理成功了,自然就没问题了;但是如果对一批消息重试了16次还是无法成功处理,就需要另外一个队列了,叫做死信队列,死信队列的名字是“%DLQ%WMSConsumerGroup”;
对死信队列中的消息处理,这个就看具体需求,比如可以专门开一个后台线程,订阅“%DLQ%WMSConsumerGroup”这个死信队列,对死信队列中的消息进行不停的重试;
当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。
死信队列是用于处理无法被正常消费的消息的。
死信队列具有如下特征:
实际上,当⼀条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在Bug。因此,对于死信消息,通常需要开发人员进行特殊处理。最关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费。
死信的perm默认是2,表示只写。
perm用于设置对当前创建Topic的操作权限:2表示只写,4表示只读,6表示读写。
如果想消费死信队列的消息,需要修改perm的值为6,便可消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。