赞
踩
首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。
消息消费失败后如果希望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
如果希望消费失败后不重试,可以直接返回Action.CommitMessage。
RocketMQ的消息重试及时分为两种,一种是Producer端重试,一种是Consume端重试。
重试的消息会进入一个%RETRY%+ConsumeGroup
的队列中。然后RocketMQ默认允许每条消息最多重试16次,每次重试的时间间隔如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer里的消息监听器返回状态改为RECONSUME_LATER测试一下。
重试次数:如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
关于MessageId:在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。但是在4.7.1版本中,每次重试MessageId都会重建。
配置覆盖:消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。
当一条消息消费失败,消息会自动进行重试,达到最大重试次数后,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者组对应的特殊队列中:死信队列(Dead Letter Message)。死信队列的名称是%DLQ%+ConsumeGroup
。
死信队列特性:
通常,一条消息如果进入死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
RocketMq本身并不保证消息不重复(只能保证至少发送一次),这样肯定会因为每次的判断,导致性能打折扣,所以将去重操作直接放在了消费端:1)消费端处理消息的业务逻辑保持幂等性。那么不管来多少条重复消息,可以实现处理的结果都一样。
消息幂等的必要性:
处理方式:业务端做防重。在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageID,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageID来作为判断幂等的关键依据。但是,这个MessageID是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
应用场景:超时未支付订单处理场景、短信延迟发送
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;预设值的延迟时间间隔。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1、messageDelayLevel配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
时间单位支持:s、m、h、d,分别表示秒、分、时、天;默认值就是上面声明的,可手工调整;
2、注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)
。
3、level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX
的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。
顺序消费的原理解析:在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。需要将有序的一组消息都存入同一个MessageQueue里,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
RocketMQ事务消息是一种特殊类型的消息,用于在分布式事务中保证消息的一致性和可靠性。在分布式系统中,经常会涉及到多个步骤的操作,需要保证这些操作要么全部成功,要么全部失败,以保持数据的一致性。
RocketMQ事务消息的实现方式如下:
通过以上的机制,RocketMQ事务消息可以保证在分布式事务中,消息的发送和本地事务的执行具有原子性,从而保证了消息的一致性和可靠性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。