当前位置:   article > 正文

关于RocketMq消息积压问题排查_rocket mq消息进行消费了但是堆积还在上升

rocket mq消息进行消费了但是堆积还在上升

1、最近生产的mq出现了一个问题:我的消费者是集群(就是双节点),现在消息积压到1亿多条,如下图所示
在这里插入图片描述
其中有两个问题:问题1,就是为什么我的消息积压这么多?问题2:我的消费者是集群,为什么只有一台消息在消费

2、接着开始排查问题,结果发现在消费端的逻辑代码里面有异常,因为我们在mq里面配置了消息重试机制,及重试机制的间隔时间,所以消息一直卡在那。但是问题来了,我们对异常代码进行捕获,在catch里面也有对mq进行返回,如下图所示

    @Override
    public OrderAction consume(final Message message, final ConsumeOrderContext context) {

        log.info("MQ消息消费者监听消息内容:{}", message);

        MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo();
        try {
            String body = new String(message.getBody());
            String tag = message.getTag();
            mqMessageRecordMo.setMsgId(message.getMsgID());
            mqMessageRecordMo.setOrderTopic(message.getTopic());
            mqMessageRecordMo.setProducerIp(message.getBornHost());
            mqMessageRecordMo.setTag(tag);
            //mqMessageRecordMo.setMessageKey(message.getKey());
            mqMessageRecordMo.setShardingKey(message.getShardingKey());
            mqMessageRecordMo.setBodyJson(body);
            mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp()));
            mqMessageRecordMo.setCreateTime(LocalDateTime.now());
            mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId"));

            log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID());

            DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey());
            targetServiceImpl.process(message.getMsgID(),dataBase,body);

            log.info("MQ消息体消费监听解析结果:{}", body);
            mqMessageRecordMo.setIsSuccess(true);
            return OrderAction.Success;
        } catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e);
            mqMessageRecordMo.setIsSuccess(false);
            mqMessageRecordMo.setErrorMsg(e.getMessage());
            return OrderAction.Success;
        } finally {
            mqMessageRecordDao.save(mqMessageRecordMo);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

如上图代码所示,在catch里面有 return OrderAction.Success; 这行代码,即使消息消费失败后,重试几次后,理论上这条消息也是会消费完毕。但是事实不是这样的,消息一直在积压,没有被消费掉,所以唯一的解释就是,catch里面的 return OrderAction.Success这行代码没有生效。果然,经过排查发现
mqMessageRecordMo.setErrorMsg(e.getMessage()); 这种写法是有异常的,在catch里面他有异常,结果return OrderAction.Success;这行代码就不会往下走,所以造成mq没有收到成功确认的回执,所以异常消息一直没有消费成功,积压在mq里面。
3、解决方案:修改上述代码里面的

targetServiceImpl.process(message.getMsgID(),dataBase,body);
  • 1

这个方法里面的异常代码,其次,把catch里面的保存异常的代码换一种写法,如下

		catch (Exception e) {
            //消费失败,挂起当前队列
            // 存储错误消息,重试,记录日志
            mqMessageRecordMo.setIsSuccess(false);
            StringWriter sw = new StringWriter();
            e.printStackTrace(new PrintWriter(sw,true));
            mqMessageRecordMo.setErrorMsg("event消息接收处理失败:" + sw.toString());
            log.error("target-MQ消费者消息监听消息业务逻辑处理失败:",e);
            return OrderAction.Success;
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4、经过上述修改后,消息正常消费。
5、但是还有一个问题,就是消息为什么都积压在同一台服务器上(我的消费者明明是两台服务器),其次,有什么办法加快消息的消费速度,如图1所示,mq的消费速度为每秒60条。
6、首先我尝试了加大消费者的消费线程数,如下图所示:

rocket:
  mq:
    dc:
      consume-thread-nums: 60
      max-reconsume-times: 3
  • 1
  • 2
  • 3
  • 4
  • 5

我把 consume-thread-nums 由10改成了60,结果发现消费速率还是这么多。
7、后面我观察积压的消息的特征,发现所有的消息都是同一个
shardingKey,看到这里,我就明白了,因为我们项目所用的rocketMq是阿里巴巴的,我们在项目里面设置的是顺序消费(还有一个乱序消费,不保证顺序),而Roctmq的集群消息是根据shardingKey 分区的,在同一个区里面的消息是按照顺序来的。所以这也就能说明为啥我的1亿多条消息都发送到同一台服务器上,因为这些消息都同属于一个shardingKey,同时也能说明为啥我调消费者的线程数没有用,因为同一个shardingKey 分区里面的消息是单线程消费。

8、后续优化,把shardingKey 分区粒度调小一些,尽可能分散。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/712568
推荐阅读
相关标签
  

闽ICP备14008679号