赞
踩
public class RebalanceService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
默认每隔20秒则执行在平衡
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
public void doRebalance() {
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
public void doRebalance() {
if (!this.pause) {
MessageListenerOrderly MessageListenerConcurrently 看是顺序还是并发
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
public void doRebalance(final boolean isOrder) { 获取当前的订阅信息subscriptionInner Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { 遍历订阅信息的主题 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { 对每个主题进行再平衡 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { ...... 删除广播模式代码 集群模式 case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 通过broker获取同一个consumerGroup所有启动的clientId List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); ...... 删除其他代码 if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); 排序后交给strategy执行分配负载 只要排序算法的入参一样出参永远一样 避免分布式一致性投票的复杂性和可靠性问题 Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueAveragely AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 表示当前clientId应该消费哪些MessageQueue List<MessageQueue> allocateResult = null; ...... 删除其他代码 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); 去重 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } 判断自己的消费是否发生了变化 底层比较重要的是在必要的时候创建PullRequest 而pullRequest被被PuMessage boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { ...... 删除日志代码 消费者进程注册broker this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } } }
public List findConsumerIdList(final String topic, final String group) {
可能有多个broker 随机获取一个 消费者进程会向所有broker注册自己
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
通过远程调用获取broker上注册的消费者进程集合
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
消费者进程集合 | 消息队列集合 |
---|---|
cid1,cid2,cid3 | mq1,mq2,mq3,mq4,mq5,mq6,mq7,mq8 |
cid1消费mq1,mq2,mq3
cid2消费mq4,mq5,mq6
cid3消费mq7,mq8
消费者进程集合 | 消息队列集合 |
---|---|
cid1,cid2,cid3 | mq1,mq2,mq3,mq4,mq5,mq6,mq7,mq8 |
cid1消费mq1,mq4,mq7
cid2消费mq2,mq5,mq8
cid3消费mq3,mq6
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); ...... 删除代码:重新负载后mqSet不存在processQueueTable依旧存在 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { 构建新的MessageQueue与ProcessQueue映射 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { } else { 如果MessageQueue之前未曾消费过 一个PullRequest======一个MessageQueue ========一个ProcessQueue log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { } } } 拉取broker消息 交给PullMessageService执行拉取任务 this.dispatchPullRequest(pullRequestList); return changed; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。