赞
踩
当消息出现大量挤压后,消费端将其代码优化后,重启消费端服务器,从 rocketmq-console 上发现 TPS 为 0,如图所示:
乍一看,第一时间得出应用还未恢复,就开始去查看相关的启动日志,通常查看的是应用服务器的 /home/baseuser/logs/rockemqlogs/rocketmq_client.logs,碰巧又看到如下的错误日志:
RebalanceService - [BUG] ConsumerGroup: consumergourp-1 The consumerId: consumer-client-id-clusterA-192.168.x.x@21932 not in cidAll: [consumer-client-id-clusterA-192.168.x.x@22164]
上面的日志显示在队列负载时候,当前节点竟然不属于 consumergourp-1 消费组的活跃连接,导致一大片的报错:
2019-11-02 19:29:17 WARN NettyClientPublicExecutor_1 - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25 DESC: the consumer's subscription not latest
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:639)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:156)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:592)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
乍一看确实是 rocketmq 相关的问题,导致上述 消费TPS 为0,经过半个小时的日志分析,发现这是RocketMQ 这是一种正常现象,最终会自动恢复,这里我留一个伏笔,将在我的知识星球中与广大星友讨论,经过日志分析得出 rocketmq 没问题,故后面去查看消息积压,发现消息积压明显在减少,那这就奇了怪了,咋消息积压在快速减少,但为啥消费TPS还是为0呢?
接下来将该问题进行探讨。
温馨提示:在问题分析部分,作者没有直接给出答案,而是一步一步探寻答案,因此会通过追踪源码来寻求答案,如果大家想急于答案,可以跳过问题分析,直接查看本文末尾的问题解答部分。
通过本文的阅读,您将获得如下信息:
1、RocketMQ 消费TPS的收集与计算逻辑。
2、RocketMQ 监控指标的设计思路。
3、RocketMQ 主从同步,消费者从主服务器拉取还是从从服务器拉取的判断逻辑。
要解开消费TPS 显示为0的问题,我们首先要来看一下 rocketmq-console 这个页面的展示逻辑,即通过阅读 rocketmq-console的源码来解开其采集逻辑。
得知,【消费者】界面查询各个消费组的基本信息的接口为 /consumer/groupList.query,那接下来,我们首先从源码的角度来分析该接口的实现逻辑。其入口如下:
org.apache.rocketmq.console.controller.ConsumerController#list @RequestMapping(value = "/groupList.query") @ResponseBody public Object list() { return consumerService.queryGroupList(); } 就是调用消费服务处理类的 queryGroupList 方法,其实现代码如下: ConsumerServiceImpl#queryGroupList public List<GroupConsumeInfo> queryGroupList() { Set<String> consumerGroupSet = Sets.newHashSet(); try { ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); // @1 for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) { // @2 SubscriptionGroupWrapper subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L); // @3 consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()); } } catch (Exception err) { throw Throwables.propagate(err); } List<GroupConsumeInfo> groupConsumeInfoList = Lists.newArrayList(); for (String consumerGroup : consumerGroupSet) { // @4 groupConsumeInfoList.add(queryGroup(consumerGroup)); } Collections.sort(groupConsumeInfoList); return groupConsumeInfoList; }
代码@1:获取集群的 broker 信息,主要是通过向 NameServer 发送 GET_BROKER_CLUSTER_INFO 请求,NameServer 返回集群包含的所有 broker 信息,包含从节点的信息,返回的格式如下:
"clusterInfo": { "brokerAddrTable": { "broker-a": { "cluster": "DefaultCluster", "brokerName": "broker-a", "brokerAddrs": { "0": "192.168.0.168:10911", "1": "192.168.0.169:10911" } }, "broker-b": { "cluster": "DefaultCluster", "brokerName": "broker-b", "brokerAddrs": { "0": "192.168.0.170:10911", "1": "192.168.1.171:10911" } } }, "clusterAddrTable": { "DefaultCluster": ["broker-a","broker-b"] } }
代码@2:遍历集群中的 brokerAddrTable 数据结构,即存储了 broker 的地址信息的 Map 。
代码@3:分别向集群中的主节点(brokerData.selectBrokerAddr()) 获取所有的订阅关系(即消费组的订阅信息)。然后将所有的消费者组名称存入 consumerGroupSet。
代码@4:遍历代码@3收集到的消费组,调用 queryGroup 依次请求消费组的运行时信息,后面接下来详细分析。
接下来将重点分析 queryGroup方法的实现细节。
ConsumerServiceImpl#queryGroup
public GroupConsumeInfo queryGroup(String consumerGroup) { GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo(); try { ConsumeStats consumeStats = null; try { consumeStats = mqAdminExt.examineConsumeStats(consumerGroup); // @1 } catch (Exception e) { logger.warn("examineConsumeStats exception, " + consumerGroup, e); } ConsumerConnection consumerConnection = null; try { consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup); } catch (Exception e) { logger.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e); } groupConsumeInfo.setGroup(consumerGroup); if (consumeStats != null) { groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps()); // @2 groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); // @3 } if (consumerConnection != null) { groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size()); groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel()); groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType()); groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion())); } } catch (Exception e) { logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e); } return groupConsumeInfo; }
从上面@1,@2,@3这三处代码可以得知,rocketmq-console 相关界面上的消费TPS主要来自 examineConsumeStats 方法,该方法我就不再继续深入,我们只需找到该方法向 broker 发送的请求编码,然后根据该请求编码找到 broker 的处理逻辑即可,最后跟踪发送的请求编码为:RequestCode.GET_CONSUME_STATS。
GET_CONSUME_STATS 命令在 broker 的处理逻辑如下:
AdminBrokerProcessor#getConsumeStats
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumeStatsRequestHeader requestHeader = (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); ConsumeStats consumeStats = new ConsumeStats(); Set<String> topics = new HashSet<String>(); if (UtilAll.isBlank(requestHeader.getTopic())) { topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup()); } else { topics.add(requestHeader.getTopic()); } for (String topic : topics) { // @1 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { // @2 log.warn("consumeStats, topic config not exist, {}", topic); continue; } { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); // @3 if (null == findSubscriptionData // && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); continue; } } for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { // @4 MessageQueue mq = new MessageQueue(); mq.setTopic(topic); mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setQueueId(i); OffsetWrapper offsetWrapper = new OffsetWrapper(); long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (brokerOffset < 0) brokerOffset = 0; long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// requestHeader.getConsumerGroup(), // topic, // i); if (consumerOffset < 0) consumerOffset = 0; offsetWrapper.setBrokerOffset(brokerOffset); // @5 offsetWrapper.setConsumerOffset(consumerOffset); // @6 long timeOffset = consumerOffset - 1; if (timeOffset >= 0) { long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset); if (lastTimestamp > 0) { offsetWrapper.setLastTimestamp(lastTimestamp); // @7 } } consumeStats.getOffsetTable().put(mq, offsetWrapper); // @8 } double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(requestHeader.getConsumerGroup(), topic); // @9 consumeTps += consumeStats.getConsumeTps(); // @10 consumeStats.setConsumeTps(consumeTps); } byte[] body = consumeStats.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
该方法比较长,重点关注如下关键点:
代码@1:遍历该消费组订阅的所有主题。消费TPS将是所有主题消费TPS的总和,其他的信息按主题、队列信息单独存放。
代码@2:如果 topic 的元信息不存在,则跳过该主题。
代码@3:如果消费组的订阅信息不存在,则跳过该订阅关系。
代码@4:收集该主题所有的读队列,以messagequeue为键,OffsetWrapper为值存储在 consumeStats.getOffsetTable() ,见代码@8。
代码@5:设置该队列的最新偏移量。
代码@6:设置该消费组对该队列的消费进度,设置为consumeOffset。
代码@7:lastTimestamp 上一次消费的消息的存储时间,实现逻辑为:取消费组对于队列的消息消费进度 -1 的消息,存储在 broker 的时间,如果对应的消息已过期被删除,则在界面上显示的时间就会为1970-01-01 08:00:00。
代码@9:通过 BrokerStatsManager 的 tpsGroupGetNums 方法从统计数据中获取该消费组针对该队列的消费TPS。
代码@10:累积消费TPS,并最终作为该消费组的总TPS。
上面这个方法非常关键,是返回给前段页面核心的数据组装逻辑,以队列、消费组为纬度给出 brokerOffset、consumeOffset、lastTimestamp。然后将数据返回给前段页面进行展示。
接下将聚焦到消费组消费TPS的统计处理,其入口为 tpsGroupGetNums 。
首先我们还是从 tpsGroupGetNums 方法入手,探究一下 tps 的获取逻辑,然后再探究数据的采集原理(这也是 rocketmq 监控相关)。
BrokerStatsManager#tpsGroupGetNums
public double tpsGroupGetNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group); // @1
return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); // @2
}
代码@1:构建统计key,其逻辑为:其键为:topic@consumerGroup,即消息主题@消费组名。
要读懂 代码@2 的代码,先来看一下 rocketmq 监控指标的存储数据结构,如下图所示:
正如上图所示:RocketMQ 使用 HashMap< String, StatusItemSet> 来存储监控收集的数据,其中Key 为监控指标的类型,例如 topic 发送消息数量、topic 发送消息大小、消费组获取消息个数等信息,每一项使用 StatsItemSet 存储,该存储结构内部又维护一个HashMap:ConcurrentMap,key 代表某一个具体的统计目标,例如记录消费组拉取消息的数量监控指标,那其统计的对象即 topic@consumer_group,最终数据的载体是 StatsItem,使用如下几个关键字段来记录统计信息:
了解了上述存储结构后,代码@2,最终其实调用的就是 StatsItemSet 的 getStatsDataInMinute 方法。
StatsItemSet#getStatsDataInMinute
public StatsSnapshot getStatsDataInMinute(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
return statsItem.getStatsDataInMinute();
}
return new StatsSnapshot();
}
从代码上最终调用 StatesItem 的 getStatsDataInMinute 方法。
StatesItem#getStatsDataInMinute
public StatsSnapshot getStatsDataInMinute() { return computeStatsData(this.csListMinute); } private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) { StatsSnapshot statsSnapshot = new StatsSnapshot(); synchronized (csList) { double tps = 0; double avgpt = 0; long sum = 0; if (!csList.isEmpty()) { CallSnapshot first = csList.getFirst(); // @1 CallSnapshot last = csList.getLast(); // @2 sum = last.getValue() - first.getValue(); // @3 tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); // @4 long timesDiff = last.getTimes() - first.getTimes(); if (timesDiff > 0) { // @5 avgpt = (sum * 1.0d) / timesDiff; } } statsSnapshot.setSum(sum); statsSnapshot.setTps(tps); statsSnapshot.setAvgpt(avgpt); } return statsSnapshot; }
代码@1:首先取快照中的第一条消息。
代码@2:取快照列表中的最后一条消息。
代码@3:计算这两个时间点 value 的差值,即这段时间内新增的总数。
代码@4:计算这段时间内的tps,即每秒处理的消息条数。
代码@5:计算 avgpt ,即平均一次操作新增的消息条数(即平均一次操作,value 新增的个数)。
消费组的消费TPS的计算逻辑就介绍到这里了,那还有一个疑问,即 StatsItem 中 csListMinute 中的数据从哪来呢?
StatsItem#init
public void init() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
samplingInSeconds();
} catch (Throwable ignored) {
}
}
}, 0, 10, TimeUnit.SECONDS);
// 省略其他代码
}
原来在创建一个新的StatsItem 的时候,就会启动一个定时任务,每隔 10s 调用 samplingInSeconds 方法进行抽样,那我们简单看一下这个方法:
StatsItem#samplingInSeconds
public void samplingInSeconds() {
synchronized (this.csListMinute) {
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
.get()));
if (this.csListMinute.size() > 7) {
this.csListMinute.removeFirst();
}
}
}
就是将当前StatsItem 中的 value 与 变更次数(time ) 存入封装成 CallSnapshot ,然后存储在快照列表中。这里的关键是times values 这些值在什么情况下会改变呢?
接着往下看,源码在消息拉取的时候,会将本次拉取的信息加入到统计信息中,其入口为:
PullMessageProcessor#processRequest
switch (response.getCode()) {
case ResponseCode.SUCCESS:
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
// 省略其他代码
}
该方法会最终更新 StatsItem 中的 values ,而 times 是 每调用一次,加1。
理论基础讲解完毕后,接下来我们来回答一下题目中的现象。
按照上面的讲解,通过 rocketmq-console 发起查看消费组的TPS时,Broker 会根据过去一分钟内采集的快照数据进行计算。快照信息的采集机制是 broker 端会每10s 会记录一下消费组对应的拉取消息数量与拉取次数。
那既然消息延迟(堆积数量在不断减少),说明消费端正在消费,按道理来说,通过上述机制进行计算,TPS 不可能会是0?那又是什么原因呢?
如果TPS为0,可以说明消费端并没有向 broker 拉取消息,因为一旦从 broker 拉取消息,有关 StatsItem 的 拉取消息总数(value) 与 拉取次数(times) 再两次采集国产中肯定不会相等,只要两者有差距,其TPS就不可能为0,那消费组在消费消息,但又不从主节点上拉取消息,这种情况会出现吗?
答案是会的,在 RocketMQ 主从同步架构中,如果需要访问的消息偏移量与当前 commitlog 最大偏移的之间的差距超过了内存的40%,消息消费将由从节点接管,故此时消费的拉取不会去主节点拉取,故上面返回的TPS就会为0。这样就能完美解答了。
经过上面的分析,我相信大家已经非常认可这个原因了,其实我们还有一个重要的论据,大家可以分别去查看 Rocketmq 主从节点 /home/{username}/logs/rocketmqlogs/stats.log,里面会每隔1分钟在日志中打印各个消费组的消费TPS.
从服务器(rocketmq-slave)对应的日志如下:
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 785717 TPS: 15714.34 AVGPT: 8.14
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 940522 TPS: 15675.37 AVGPT: 8.06
主服务器(rocketmq-master)对应的日志如下:
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
INFO - [GROUP_GET_NUMS] [t1@c1] Stats In One Minute, SUM: 0 TPS: 0.00 AVGPT: 0.00
主服务器上的TPS一定会0吗?不一定,其实也不一定。这里借着这波日志,再来总结一下 RocketMQ 主从同步时的切换逻辑。
1、如果消费端请求的消息物理偏移量与 broker 当前最新的物理偏移量之间的差距查过内存的40%,下一次拉取会往从节点发送(当然前提是slaveReadEnable = true)。
2、当从节点开始接管消息消费时,下一次拉取请求一定会往从节点发送码?答案也是不一定:
关于RocketMQ 主从同步若干问题答疑,可以参考笔者的另外一篇文章:RocketMQ 主从同步若干答疑 。
本文就介绍到这里了,如果觉得对您有帮助的话,麻烦帮忙点个赞,非常感谢您的鼓励与支持。
见文如面,我是威哥,热衷于成体系剖析JAVA主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。