赞
踩
本节讲解下当MQ消息消费失败,或者发送不成功时如何处理消息,消息发送不成功一般存在于几种情况,网络原因,服务宕机,或者broker配置
如果是由于broker配置原因,可以通过报错提示排查原因:
无法查到路由信息,一般考虑到rocketMQ读取路由信息过程:
客户端报消息发送超时,通常第一怀疑的对象是RocketMQ服务器,是不是Broker性能出现了抖动,无法抗住当前的量。那我们如何来排查RocketMQ当前是否有性能瓶颈呢?
查看rocketMQ日志:
- cd ~/logs/rocketmqlogs/
- grep -n 'PAGECACHERT' store.log | more
因为网络抖动原因出现的消息超时,通过减少消息发送的超时时间,增加重试次数,并增加快速失败的最大等待时长。具体措施如下:
增加Broker端快速失败的时长,建议为1000,在broker的配置文件中增加如下配置:
maxWaitTimeMillsInQueue=1000
- DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
- producer.setRetryTimesWhenSendFailed(5);// 同步发送模式:重试次数
- producer.setRetryTimesWhenSendAsyncFailed(5);// 异步发送模式:重试次数
- producer.start();
- producer.send(msg,500);//消息发送超时时间
首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试 ,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- //处理消息
- doConsumeMessage(message);
- //方式1:返回 Action.ReconsumeLater,消息将重试
- return Action.ReconsumeLater;
- //方式2:返回 null,消息将重试
- return null;
- //方式3:直接抛出异常, 消息将重试
- throw new RuntimeException("Consumer Message exceotion");
- }
- }
如果希望消费失败后不重试,可以直接返回Action.CommitMessage。
- public class MessageListenerImpl implements MessageListener {
- @Override
- public Action consume(Message message, ConsumeContext context) {
- try {
- doConsumeMessage(message);
- } catch (Throwable e) {
- //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
- return Action.CommitMessage;
- }
- //消息处理正常,直接返回 Action.CommitMessage;
- return Action.CommitMessage;
- }
- }
生产者消息重试策略:
如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试
DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:
-
- //设置消息发送失败时的最大重试次数
- public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
- this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
- }
-
- //同步发送消息,并指定超时时间
- public SendResult send(Message msg,
- long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, timeout);
- }
消费者消费异常重试
异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。
因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态
异常重试
RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:
- messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 1
但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。
defaultMQPushConsumer.setMaxReconsumeTimes(5);
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的名称是%DLQ%+ConsumGroup
死信队列的特征:
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
幂等的概念
在MQ系统中,对于消息幂等有三种实现语义:
这三种语义都有他适用的业务场景。
关于这个问题,官网上有明确的回答:4. Are messages delivered exactly once?RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.
消息幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
处理方式
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。