赞
踩
1、最近生产的mq出现了一个问题:我的消费者是集群(就是双节点),现在消息积压到1亿多条,如下图所示
其中有两个问题:问题1,就是为什么我的消息积压这么多?问题2:我的消费者是集群,为什么只有一台消息在消费
2、接着开始排查问题,结果发现在消费端的逻辑代码里面有异常,因为我们在mq里面配置了消息重试机制,及重试机制的间隔时间,所以消息一直卡在那。但是问题来了,我们对异常代码进行捕获,在catch里面也有对mq进行返回,如下图所示
@Override public OrderAction consume(final Message message, final ConsumeOrderContext context) { log.info("MQ消息消费者监听消息内容:{}", message); MqMessageRecordMo mqMessageRecordMo = new MqMessageRecordMo(); try { String body = new String(message.getBody()); String tag = message.getTag(); mqMessageRecordMo.setMsgId(message.getMsgID()); mqMessageRecordMo.setOrderTopic(message.getTopic()); mqMessageRecordMo.setProducerIp(message.getBornHost()); mqMessageRecordMo.setTag(tag); //mqMessageRecordMo.setMessageKey(message.getKey()); mqMessageRecordMo.setShardingKey(message.getShardingKey()); mqMessageRecordMo.setBodyJson(body); mqMessageRecordMo.setProducerTime(LocalDateLUtils.timestampToDatetime(message.getBornTimestamp())); mqMessageRecordMo.setCreateTime(LocalDateTime.now()); mqMessageRecordMo.setPMsgId(message.getUserProperties("pMsgId")); log.info("MQ消费者消息消费成功,解析并处理相应的业务逻辑, tag = {},key = {},messageId = {}", tag, message.getShardingKey(),message.getMsgID()); DataBaseEnum dataBase = DataBaseEnum.getEnum(message.getShardingKey()); targetServiceImpl.process(message.getMsgID(),dataBase,body); log.info("MQ消息体消费监听解析结果:{}", body); mqMessageRecordMo.setIsSuccess(true); return OrderAction.Success; } catch (Exception e) { //消费失败,挂起当前队列 // 存储错误消息,重试,记录日志 log.error("target MQ消费者消息监听消息业务逻辑处理失败:",e); mqMessageRecordMo.setIsSuccess(false); mqMessageRecordMo.setErrorMsg(e.getMessage()); return OrderAction.Success; } finally { mqMessageRecordDao.save(mqMessageRecordMo); } }
如上图代码所示,在catch里面有 return OrderAction.Success; 这行代码,即使消息消费失败后,重试几次后,理论上这条消息也是会消费完毕。但是事实不是这样的,消息一直在积压,没有被消费掉,所以唯一的解释就是,catch里面的 return OrderAction.Success这行代码没有生效。果然,经过排查发现
mqMessageRecordMo.setErrorMsg(e.getMessage()); 这种写法是有异常的,在catch里面他有异常,结果return OrderAction.Success;这行代码就不会往下走,所以造成mq没有收到成功确认的回执,所以异常消息一直没有消费成功,积压在mq里面。
3、解决方案:修改上述代码里面的
targetServiceImpl.process(message.getMsgID(),dataBase,body);
这个方法里面的异常代码,其次,把catch里面的保存异常的代码换一种写法,如下
catch (Exception e) {
//消费失败,挂起当前队列
// 存储错误消息,重试,记录日志
mqMessageRecordMo.setIsSuccess(false);
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw,true));
mqMessageRecordMo.setErrorMsg("event消息接收处理失败:" + sw.toString());
log.error("target-MQ消费者消息监听消息业务逻辑处理失败:",e);
return OrderAction.Success;
}
4、经过上述修改后,消息正常消费。
5、但是还有一个问题,就是消息为什么都积压在同一台服务器上(我的消费者明明是两台服务器),其次,有什么办法加快消息的消费速度,如图1所示,mq的消费速度为每秒60条。
6、首先我尝试了加大消费者的消费线程数,如下图所示:
rocket:
mq:
dc:
consume-thread-nums: 60
max-reconsume-times: 3
我把 consume-thread-nums 由10改成了60,结果发现消费速率还是这么多。
7、后面我观察积压的消息的特征,发现所有的消息都是同一个
shardingKey,看到这里,我就明白了,因为我们项目所用的rocketMq是阿里巴巴的,我们在项目里面设置的是顺序消费(还有一个乱序消费,不保证顺序),而Roctmq的集群消息是根据shardingKey 分区的,在同一个区里面的消息是按照顺序来的。所以这也就能说明为啥我的1亿多条消息都发送到同一台服务器上,因为这些消息都同属于一个shardingKey,同时也能说明为啥我调消费者的线程数没有用,因为同一个shardingKey 分区里面的消息是单线程消费。
8、后续优化,把shardingKey 分区粒度调小一些,尽可能分散。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。