当前位置:   article > 正文

【真实案例】消息消费失败如何处理?

消息队列消费失败怎么办

每天早上七点三十,准时推送干货

一、介绍

在介绍消息中间件 MQ 之前,我们先来简单的了解一下,为何要引用消息中间件。

例如,在电商平台中,常见的用户下单,会经历以下几个流程。

当用户下单时,创建完订单之后,会调用第三方支付平台,对用户的账户金额进行扣款,如果平台支付扣款成功,会将结果通知到对应的业务系统,接着业务系统会更新订单状态,同时调用仓库接口,进行减库存,通知物流进行发货!

试想一下,从订单状态更新、到扣减库存、通知物流发货都在一个方法内同步完成,假如用户支付成功、订单状态更新也成功,但是在扣减库存或者通知物流发货步骤失败了,那么就会造成一个问题,用户已经支付成功了,只是在仓库扣减库存方面失败,从而导致整个交易失败!

一单失败,老板可以假装看不见,但是如果上千个单子都因此失败,那么因系统造成的业务损失,将是巨大的,老板可能坐不住了!

因此,针对这种业务场景,架构师们引入了异步通信技术方案,从而保证服务的高可用,大体流程如下:

当订单系统收到支付平台发送的扣款结果之后,会将订单消息发送到 MQ 消息中间件,同时也会更新订单状态。

在另一端,由仓库系统来异步监听订单系统发送的消息,当收到订单消息之后,再操作扣减库存、通知物流公司发货等服务!

在优化后的流程下,即使扣减库存服务失败,也不会影响用户交易。

正如《人月神话》中所说的,软件工程,没有银弹

当引入了 MQ 消息中间件之后,同样也会带来另一个问题,假如 MQ 消息中间件突然宕机了,导致消息无法发送出去,那仓库系统就无法接受到订单消息,进而也无法发货!

针对这个问题,业界主流的解决办法是采用集群部署,一主多从模式,从而实现服务的高可用,即使一台机器突然宕机了,也依然能保证服务可用,在服务器故障期间,通过运维手段,将服务重新启动,之后服务依然能正常运行!

但是还有另一个问题,假如仓库系统已经收到订单消息了,但是业务处理异常,或者服务器异常,导致当前商品库存并没有扣减,也没有发货!

这个时候又改如何处理呢?

今天我们所要介绍的正是这种场景,假如消息消费失败,我们应该如何处理?

二、解决方案

针对消息消费失败的场景,我们一般会通过如下方式进行处理:

  • 当消息消费失败时,会对消息进行重新推送

  • 如果重试次数超过最大值,会将异常消息存储到数据库,然后人工介入排查问题,进行手工重试

当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ 消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB数据库中,方便后续查询异常的信息。

基于以上系统模型,我们可以编写一个公共重试组件,话不多说,直接干!

三、代码实践

本次补偿服务采用 rabbitmq 消息中间件进行处理,其他消息中间件处理思路也类似!

3.1、创建一个消息重试实体类
  1. @Data
  2. @EqualsAndHashCode(callSuper = false)
  3. @Accessors(chain = true)
  4. public class MessageRetryDTO implements Serializable {
  5.     private static final long serialVersionUID = 1L;
  6.     /**
  7.      * 原始消息body
  8.      */
  9.     private String bodyMsg;
  10.     /**
  11.      * 消息来源ID
  12.      */
  13.     private String sourceId;
  14.     /**
  15.      * 消息来源描述
  16.      */
  17.     private String sourceDesc;
  18.     /**
  19.      * 交换器
  20.      */
  21.     private String exchangeName;
  22.     /**
  23.      * 路由键
  24.      */
  25.     private String routingKey;
  26.     /**
  27.      * 队列
  28.      */
  29.     private String queueName;
  30.     /**
  31.      * 状态,1:初始化,2:成功,3:失败
  32.      */
  33.     private Integer status = 1;
  34.     /**
  35.      * 最大重试次数
  36.      */
  37.     private Integer maxTryCount = 3;
  38.     /**
  39.      * 当前重试次数
  40.      */
  41.     private Integer currentRetryCount = 0;
  42.     /**
  43.      * 重试时间间隔(毫秒)
  44.      */
  45.     private Long retryIntervalTime = 0L;
  46.     /**
  47.      * 任务失败信息
  48.      */
  49.     private String errorMsg;
  50.     /**
  51.      * 创建时间
  52.      */
  53.     private Date createTime;
  54.     @Override
  55.     public String toString() {
  56.         return "MessageRetryDTO{" +
  57.                 "bodyMsg='" + bodyMsg + '\'' +
  58.                 ", sourceId='" + sourceId + '\'' +
  59.                 ", sourceDesc='" + sourceDesc + '\'' +
  60.                 ", exchangeName='" + exchangeName + '\'' +
  61.                 ", routingKey='" + routingKey + '\'' +
  62.                 ", queueName='" + queueName + '\'' +
  63.                 ", status=" + status +
  64.                 ", maxTryCount=" + maxTryCount +
  65.                 ", currentRetryCount=" + currentRetryCount +
  66.                 ", retryIntervalTime=" + retryIntervalTime +
  67.                 ", errorMsg='" + errorMsg + '\'' +
  68.                 ", createTime=" + createTime +
  69.                 '}';
  70.     }
  71.     /**
  72.      * 检查重试次数是否超过最大值
  73.      *
  74.      * @return
  75.      */
  76.     public boolean checkRetryCount() {
  77.         retryCountCalculate();
  78.         //检查重试次数是否超过最大值
  79.         if (this.currentRetryCount < this.maxTryCount) {
  80.             return true;
  81.         }
  82.         return false;
  83.     }
  84.     /**
  85.      * 重新计算重试次数
  86.      */
  87.     private void retryCountCalculate() {
  88.         this.currentRetryCount = this.currentRetryCount + 1;
  89.     }
  90. }
3.2、编写服务重试抽象类
  1. public abstract class CommonMessageRetryService {
  2.     private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Autowired
  6.     private MongoTemplate mongoTemplate;
  7.     /**
  8.      * 初始化消息
  9.      *
  10.      * @param message
  11.      */
  12.     public void initMessage(Message message) {
  13.         log.info("{} 收到消息: {},业务数据:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
  14.         try {
  15.             //封装消息
  16.             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
  17.             if (log.isInfoEnabled()) {
  18.                 log.info("反序列化消息:{}", messageRetryDto.toString());
  19.             }
  20.             prepareAction(messageRetryDto);
  21.         } catch (Exception e) {
  22.             log.warn("处理消息异常,错误信息:", e);
  23.         }
  24.     }
  25.     /**
  26.      * 准备执行
  27.      *
  28.      * @param retryDto
  29.      */
  30.     protected void prepareAction(MessageRetryDTO retryDto) {
  31.         try {
  32.             execute(retryDto);
  33.             doSuccessCallBack(retryDto);
  34.         } catch (Exception e) {
  35.             log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e);
  36.             //执行失败,计算是否还需要继续重试
  37.             if (retryDto.checkRetryCount()) {
  38.                 if (log.isInfoEnabled()) {
  39.                     log.info("重试消息:{}", retryDto.toString());
  40.                 }
  41.                 retrySend(retryDto);
  42.             } else {
  43.                 if (log.isWarnEnabled()) {
  44.                     log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e);
  45.                 }
  46.                 doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
  47.             }
  48.         }
  49.     }
  50.     /**
  51.      * 任务执行成功,回调服务(根据需要进行重写)
  52.      *
  53.      * @param messageRetryDto
  54.      */
  55.     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
  56.         try {
  57.             successCallback(messageRetryDto);
  58.         } catch (Exception e) {
  59.             log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
  60.         }
  61.     }
  62.     /**
  63.      * 任务执行失败,回调服务(根据需要进行重写)
  64.      *
  65.      * @param messageRetryDto
  66.      */
  67.     private void doFailCallBack(MessageRetryDTO messageRetryDto) {
  68.         try {
  69.             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
  70.             failCallback(messageRetryDto);
  71.         } catch (Exception e) {
  72.             log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
  73.         }
  74.     }
  75.     /**
  76.      * 执行任务
  77.      *
  78.      * @param messageRetryDto
  79.      */
  80.     protected abstract void execute(MessageRetryDTO messageRetryDto);
  81.     /**
  82.      * 成功回调
  83.      *
  84.      * @param messageRetryDto
  85.      */
  86.     protected abstract void successCallback(MessageRetryDTO messageRetryDto);
  87.     /**
  88.      * 失败回调
  89.      *
  90.      * @param messageRetryDto
  91.      */
  92.     protected abstract void failCallback(MessageRetryDTO messageRetryDto);
  93.     /**
  94.      * 构建消息补偿实体
  95.      * @param message
  96.      * @return
  97.      */
  98.     private MessageRetryDTO buildMessageRetryInfo(Message message){
  99.         //如果头部包含补偿消息实体,直接返回
  100.         Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
  101.         if(messageHeaders.containsKey("message_retry_info")){
  102.             Object retryMsg = messageHeaders.get("message_retry_info");
  103.             if(Objects.nonNull(retryMsg)){
  104.                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
  105.             }
  106.         }
  107.         //自动将业务消息加入补偿实体
  108.         MessageRetryDTO messageRetryDto = new MessageRetryDTO();
  109.         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
  110.         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
  111.         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
  112.         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
  113.         messageRetryDto.setCreateTime(new Date());
  114.         return messageRetryDto;
  115.     }
  116.     /**
  117.      * 异常消息重新入库
  118.      * @param retryDto
  119.      */
  120.     private void retrySend(MessageRetryDTO retryDto){
  121.         //将补偿消息实体放入头部,原始消息内容保持不变
  122.         MessageProperties messageProperties = new MessageProperties();
  123.         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
  124.         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
  125.         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
  126.         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
  127.     }
  128.     /**
  129.      * 将异常消息存储到mongodb中
  130.      * @param retryDto
  131.      */
  132.     private void saveMessageRetryInfo(MessageRetryDTO retryDto){
  133.         try {
  134.             mongoTemplate.save(retryDto, "message_retry_info");
  135.         } catch (Exception e){
  136.             log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e);
  137.         }
  138.     }
  139. }
3.3、编写监听服务类

在消费端应用的时候,也非常简单,例如,针对扣减库存操作,我们可以通过如下方式进行处理!

  1. @Component
  2. public class OrderServiceListener extends CommonMessageRetryService {
  3.     private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
  4.     /**
  5.      * 监听订单系统下单成功消息
  6.      * @param message
  7.      */
  8.     @RabbitListener(queues = "mq.order.add")
  9.     public void consume(Message message) {
  10.         log.info("收到订单下单成功消息: {}", message.toString());
  11.         super.initMessage(message);
  12.     }
  13.     @Override
  14.     protected void execute(MessageRetryDTO messageRetryDto) {
  15.         //调用扣减库存服务,将业务异常抛出来
  16.     }
  17.     @Override
  18.     protected void successCallback(MessageRetryDTO messageRetryDto) {
  19.         //业务处理成功,回调
  20.     }
  21.     @Override
  22.     protected void failCallback(MessageRetryDTO messageRetryDto) {
  23.         //业务处理失败,回调
  24.     }
  25. }

当消息消费失败,并超过最大次数时,会将消息存储到 mongodb 中,然后像常规数据库操作一样,可以通过 web 接口查询异常消息,并针对具体场景进行重试!

四、小结

可能有的同学会问,为啥不将异常消息存在数据库?

起初的确是存储在 MYSQL 中,但是随着业务的快速发展,订单消息数据结构越来越复杂,数据量也非常的大,甚至大到 MYSQL 中的 text 类型都无法存储,同时这种数据结构也不太适合在 MYSQL 中存储,因此将其迁移到 mongodb!

本文主要围绕消息消费失败这种场景,进行基础的方案和代码实践讲解,可能有理解不到位的地方,欢迎批评指出!

五、参考

1、石杉的架构笔记 - 如何处理消息消费失败问题

PS:公号内回复「java」即可进入Java 新手学习交流群,一起成长进步!老规矩,兄弟们还记得么,右下角的 “在看” 点一下,如果感觉文章内容不错的话,记得分享朋友圈让更多的人知道!

求关注

最近大家应该发现微信公众号信息流改版了吧,再也不是按照时间顺序展示了。这就对阿粉这样的坚持的原创小号主,可以说非常打击,阅读量直线下降,正反馈持续减弱。

如果想在第一时间收到阿粉的文章,不被公号的信息流影响,那么可以给Java极客技术设为一个星标

最后感谢各位的阅读,才疏学浅,难免存在纰漏,如果你发现错误的地方,留言告诉阿粉,阿粉这么宠你们,肯定会改的~

最后谢谢大家支持~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/996779
推荐阅读
相关标签
  

闽ICP备14008679号