赞
踩
Q:业务系统之间通过MQ进行交互,当消费者发生未知异常时,消息消费失败,如何处理才保证消息的消费的可靠性。
A:
从如下几点考虑
何时ack
无论消息成功还是失败,都会ack,消息不会堆积在MQ中
只有成功才ack,消息堆积在MQ中
消费日志
接收消息后先入库
入库失败,直接开始重试
入库成功,开始后续业务逻辑
消息id,不能重复入库,可以更新状态,比如重试次数,执行状态
记录从哪儿来的
队列名
交换器
路由键
消息数据
状态
消费失败重试
原则:
可以重新消费消息
可定制重试间隔时间与重试次数
集群环境下的重试,如果本节点出问题时,可由其他节点消费,尽快完成业务逻辑
重试状态记录
达到重试上限(时间、次数)后,变更数据,并酌情通知人工干预
可选方案
spring-rabbitmq 自带重试机制探究
利用死信机制实现重试
补偿
根据最终失败状态的消息,进行补偿,补偿逻辑需要业务方实现。
spring-rabbitmq 自带重试机制
spring-rabbitmq,重试机制也是在当前系统内存中重试。
可实现功能
定制重试间隔机制
间隔时间
间隔时间幂因子
最大时间间隔
最大重试次数
重试过程监听
重试开始前回调
每次重试失败时回调
达到最大重试次数,结束重试时回调
重试结束时,消息处理策略MessageRecoverer
ImmediateRequeueMessageRecoverer,
立即重新返回队列
如果存在其他消费者,此消息可被其他消费者重新消费
消息会一直重试下去,需要自己处理,跳出重试循环
RejectAndDontRequeueRecoverer
拒绝消息,并且不会重新回到队列
消息会被投递到死信交换器/死信队列,如果当前队列有相关配置
仅仅在本节点中重试,直到失败
RepublishMessageRecoverer
重新发布出去,且消息头中会携带异常堆栈信息,x-exception
可以指定exchange,如果不指定将使用一些默认值,
可以考虑异步处理失败消息。
重试结束时,如何更新DB中消息的状态
重试中消费成功,可在消费者method中处理,直接更新消息状态
结束时,消费失败
异常处理errorHandler
此处理器,每次失败都会触发,可以拿到消息的信息
可以考虑每次更新DB中消息的重试次数,或者状态
重试次数在数据库中累加,
但是会增加DB压力
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。