赞
踩
继续接着上一下消费者的知识点,本节我们将追踪消费者中最重要的rebalance代码和offset提交代码!
首先,我们看看哪些情况会触发rebalance:
1.有新的consumer 加入
2.有consumer宕机,可能网络延迟,规定时间内没有进行heartbean
3.有消费者主动退出 consumer gorup
4.conmuser group订阅的topic出现 partition数量变化
5.消费者调用unsubscrible() 取消对topic的订阅
我们把rebalance整体分为三个阶段,分别寻找GroupCoordinator,当一个consumer上线的时候,不知道我们订阅的topic,在哪个node上面。第二阶段:进行join Group,发送JoinGroupRequest请求,处理response。第三阶段:进入Synchronizing Group State阶段,
1.1 第一阶段
找到GroupCoordinator,我们随便向集群中的任意node,发送一个GroupCoordinatorRequest请求,在该请求里面就只带了groupId这一个变量,集群收到之后,返会GroupCoordinatorResponse响应,包含了coordinator节点的id,host,port三个值,还有错误码(便于查找问题,short类型)
注意poll()是整个rebalance流程都会走的主方法,它位于ConsumerCoordinator类中
- public void poll(long now, long remainingMs) {
- invokeCompletedOffsetCommitCallbacks();
-
- if (subscriptions.partitionsAutoAssigned()) {
- if (coordinatorUnknown()) {
- ensureCoordinatorReady();//第一阶段
- now = time.milliseconds();
- }
- ....
- }
我们首先看一下if中的判断方法,如果coordinator不存在,那么我们就需要通过GroupCoodinatorRequest请求找到coordinator。
- public boolean coordinatorUnknown() {
- return coordinator() == null;
- }
现在步入正题,发送request,处理response,这里的方法,只是发送request的入口,真正的send()还需要深入
它位于AbstractCoordinator类中,也就是ConsumerCoordinator的父类
----- request创建与发送 -----------
- protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
- long remainingMs = timeoutMs;
- while (coordinatorUnknown()) {
- RequestFuture<Void> future = lookupCoordinator(); //发送【入】
- client.poll(future, remainingMs); //等待结果
- if (future.failed()) {
- if (future.isRetriable()) {
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
- client.awaitMetadataUpdate(remainingMs); //阻塞更新metadata中记录的集群元数据,重新执行
- } else
- throw future.exception();
- } else if (coordinator != null && client.connectionFailed(coordinator)) {
- //找到了,但是网络连接失败,清空unsent,重新发送
- coordinatorDead();
- time.sleep(retryBackoffMs);
- }
- remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
- if (remainingMs <= 0)
- break;
- }
- return !coordinatorUnknown();
- }
我们要了解GroupDoordinatorRequest是如何产生的,所以我们进入lookupCoordinator()方法中:
- protected synchronized RequestFuture<Void> lookupCoordinator() {
- if (findCoordinatorFuture == null) {
- // find a node to ask about the coordinator
- Node node = this.client.leastLoadedNode(); //找一个负载最小的node 发送data
- if (node == null) {
- log.debug("No broker available to send GroupCoordinator request");
- return RequestFuture.noBrokersAvailable();
- } else
- findCoordinatorFuture = sendGroupCoordinatorRequest(node);//【入】
- }
- return findCoordinatorFuture;
- }
它的主要功能是,找一个负载最小的node节点,用来发送信息,然后再深入:
- private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
- // initiate the group metadata request
- log.debug("Sending GroupCoordinator request to broker {}", node);
- FindCoordinatorRequest.Builder requestBuilder =
- new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
- return client.send(node, requestBuilder)
- .compose(new GroupCoordinatorResponseHandler()); //发送
- }
可以看到,我们找到了CoordinatorRequest的构建了,直接new出来的,参数也很简单,就是一个grooupId,这个Builder类是FindCoorDinatorRequest的内部类,这样设计的原因是什么?
- public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
- private final String coordinatorKey; //groupId
- private final CoordinatorType coordinatorType; //Group
- private final short minVersion;
- ...
- }
最后,使用client发送:注意这里的CompletionHandler,处理器,它也是内部类。
- public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
- long now = time.milliseconds();
- RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
- ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
- completionHandler);
- unsent.put(node, clientRequest);
-
- // wakeup the client in case it is blocking in poll so that we can send the queued request
- client.wakeup();
- return completionHandler.future;
- }
--------- response 处理 ---------------
我们分析一下回调过程:
首先我们在send()里面返回的是一个future,这个future是什么?
是一个ClinetResponse,我们还有对应的ClientRequest。我们再看一下ClientResponse字段:
ClientResponse类
- public class ClientResponse {
- private final RequestHeader requestHeader;
- private final RequestCompletionHandler callback; //★
- private final String destination;
- private final long receivedTimeMs;
- private final long latencyMs;
- private final boolean disconnected;
- private final UnsupportedVersionException versionMismatch;
- private final AbstractResponse responseBody;
- ...
- }
注意★这个字段,我们回忆一下,调用send()方法的时候,后面还有一个尾巴:
这里就对应上了,所以我们查看这个responseHandler是如何处理的:
- public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
- ...
- if (error == Errors.NONE) { //无错误
- synchronized (AbstractCoordinator.this) { //给coordinator赋值
- AbstractCoordinator.this.coordinator = new Node(
- Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
- findCoordinatorResponse.node().host(),
- findCoordinatorResponse.node().port());
- log.info("Discovered coordinator {}", coordinator);
- client.tryConnect(coordinator);
- heartbeat.resetTimeouts(time.milliseconds());
- }
- future.complete(null);
- } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- future.raise(new GroupAuthorizationException(groupId));
- } else {
- log.debug("Group coordinator lookup failed: {}", error.message());
- future.raise(error);
- }
- }
好了,我们到这里就成功的把coordinator初始化了!
1.2 第二阶段
成功找到GroupCoordinator之后,进入第二阶段,消费者向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
入口同样在ConsumerCoordinator类中的poll()方法中
- public void poll(long now, long remainingMs) {
- ...
- if (needRejoin()) {
- if (subscriptions.hasPatternSubscription())
- client.ensureFreshMetadata();
- ensureActiveGroup(); //【入】第二阶段
- now = time.milliseconds();
- }
- ...
- }
调用方法判断是否需要join()
- public boolean needRejoin() {
- if (!subscriptions.partitionsAutoAssigned()) //是否为自动分区分配订阅模式
- return false;
- // we need to rejoin if we performed the assignment and metadata has changed
- if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
- return true;
- // we need to join if our subscription has changed since the last join
- if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
- return true;
- return super.needRejoin();
- }
好了,现在我们需要rejoin(),并且GroupCoordinator已经找到了,继续跟踪
来到AbstractCoordinator类中,ConsumerCoordinator的父类
- public void ensureActiveGroup() {
- ensureCoordinatorReady(); //保证 coordinator已经找到
- startHeartbeatThreadIfNeeded(); //心跳检测 开启
- joinGroupIfNeeded(); //是否需要join
- }
看一下第二个方法,开启心跳检测,内容简单。
- private synchronized void startHeartbeatThreadIfNeeded() {
- if (heartbeatThread == null) {
- heartbeatThread = new HeartbeatThread();
- heartbeatThread.start();
- }
- }
现在进入正题:joinGroupIfNeeded()方法:
- void joinGroupIfNeeded() {
- while (needRejoin() || rejoinIncomplete()) {
- ensureCoordinatorReady(); //确保 找到了groupCoordinator
- if (needsJoinPrepare) {
- onJoinPrepare(generation.generationId, generation.memberId); //1.预处理
- needsJoinPrepare = false;
- }
-
- RequestFuture<ByteBuffer> future = initiateJoinGroup(); //2.【入】
- client.poll(future); //阻塞等待完成请求
-
- if (future.succeeded()) {
- onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
- resetJoinGroupFuture();
- needsJoinPrepare = true;
- } else {
- resetJoinGroupFuture();
- RuntimeException exception = future.exception();
- if (exception instanceof UnknownMemberIdException ||
- exception instanceof RebalanceInProgressException ||
- exception instanceof IllegalGenerationException)
- continue;
- else if (!future.isRetriable())
- throw exception;
- time.sleep(retryBackoffMs);
- }
- }
- }
看第一个1.预处理 onJoinPrepare()方法,
- protected void onJoinPrepare(int generation, String memberId) {
- // commit offsets prior to rebalance if auto-commit enabled
- maybeAutoCommitOffsetsSync(rebalanceTimeoutMs); //进行一次同步offset操作
-
- // execute the user's callback before rebalance
- ConsumerRebalanceListener listener = subscriptions.listener();
- log.info("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
- try {
- Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
- listener.onPartitionsRevoked(revoked);
- } catch (WakeupException | InterruptException e) {
- throw e;
- } catch (Exception e) {
- log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
- }
-
- isLeader = false;
- subscriptions.resetGroupSubscription();
- }
第二个方法,----- request创建与发送 -----------
- private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
- if (joinFuture == null) {
- disableHeartbeatThread(); //中断 心跳检测
- state = MemberState.REBALANCING;
- joinFuture = sendJoinGroupRequest(); //【入】 发送
- joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
- @Override
- public void onSuccess(ByteBuffer value) {
- synchronized (AbstractCoordinator.this) {
- log.info("Successfully joined group with generation {}", generation.generationId);
- state = MemberState.STABLE;
- rejoinNeeded = false;
- if (heartbeatThread != null)
- heartbeatThread.enable();
- }
- }
- @Override
- public void onFailure(RuntimeException e) {
- synchronized (AbstractCoordinator.this) {
- state = MemberState.UNJOINED;
- }
- }
- });
- }
- return joinFuture;
- }
继续深入,创建request
- private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
- if (coordinatorUnknown())
- return RequestFuture.coordinatorNotAvailable();
- JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
- groupId,
- this.sessionTimeoutMs,
- this.generation.memberId,
- protocolType(),
- metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
- return client.send(coordinator, requestBuilder)
- .compose(new JoinGroupResponseHandler());
- }
到这里,套路就和基本的发送request讨论一样了,绑定一个回调方法,处理response。
request字段
- private final String groupId;
- private final int sessionTimeout; //超过该时间没有收到心跳,则任务consumer下线
- private final int rebalanceTimeout;
- private final String memberId; //GroupCoordinator分配给消费者的ID
- private final String protocolType; //实现的协议,默认是“consumer”
- private final List<ProtocolMetadata> groupProtocols; //针对不同的PartitionAssignor,序列化后的消费者的订阅信息
--------- response 处理 ---------------
我们这里直接跳转道response的处理,位于JoinGroupResponseHandler类中
- if (error == Errors.NONE) {
- log.debug("Received successful JoinGroup response: {}", joinResponse);
- sensors.joinLatency.record(response.requestLatencyMs());
-
- synchronized (AbstractCoordinator.this) {
- if (state != MemberState.REBALANCING) {
- // if the consumer was woken up before a rebalance completes, we may have already left
- // the group. In this case, we do not want to continue with the sync group.
- future.raise(new UnjoinedGroupException());
- } else { //走这里
- AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
- joinResponse.memberId(), joinResponse.groupProtocol());
- if (joinResponse.isLeader()) { //如果是leader消费者
- onJoinLeader(joinResponse).chain(future);
- } else { //跟随者
- onJoinFollower().chain(future);
- }
- }
- }
- }
response字段:
- private final int throttleTimeMs;
- private final Errors error;
- private final int generationId; //GroupCoordinator分配的年代信息
- private final String groupProtocol; //GroupCoordinator选择的Assignor
- private final String memberId; // GroupCoordinator分配给消费者的Id
- private final String leaderId; //leader的memberId
- private final Map<String, ByteBuffer> members; //对应消费者的订阅信息
这里就重点了。返回结果中,如果consumer被定位为leader,则要执行分区分配,并且把分区分配的结果,返回给server,而follower则要再发送request,获得分配分区后的结果。
1.首先看一下follower(),这个简单一点
- private RequestFuture<ByteBuffer> onJoinFollower() {
- SyncGroupRequest.Builder requestBuilder =
- new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
- Collections.<String, ByteBuffer>emptyMap());
- return sendSyncGroupRequest(requestBuilder);
- }
2. onJoinLeader()
它与follower()方法差不多,只不过多了一个分区分配的结果放到request中,而follower的request放入的是一个空集合。
- private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
- try {
- Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
- joinResponse.members());
-
- SyncGroupRequest.Builder requestBuilder =
- new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
- return sendSyncGroupRequest(requestBuilder);
- }
具体的分区分配,我们在消费者(1)中讲解了,主要就是调用AbstractPartitionAssignor的assign方法,然后调用对应策略的assign()方法,然后获得最后分配的结果。
这里我们按栈的方式慢慢的返回结果:
得到future之后,第一个 (这些之前都出场过的)
- public void onSuccess(ByteBuffer value) {
- synchronized (AbstractCoordinator.this) {
- state = MemberState.STABLE;
- rejoinNeeded = false;
- if (heartbeatThread != null)
- heartbeatThread.enable();
- }
- }
- if (future.succeeded()) {
- onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
- resetJoinGroupFuture();
- needsJoinPrepare = true;
- }
这个OnJoinComplete()方法,也是很重要的,进入第三阶段的时候,讲解
1.3 第三阶段
完成分区分配之后,进入Synchronizing Group state阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。
我们知道,在分区分配完成之后,leader和follower都会发送这个请求,逻辑简单,我们这里主要是看它是如何处理response的,进入handler代码:
- private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
- @Override
- public void handle(SyncGroupResponse syncResponse,
- RequestFuture<ByteBuffer> future) {
- Errors error = syncResponse.error();
- if (error == Errors.NONE) {
- sensors.syncLatency.record(response.requestLatencyMs());
- //传播分区分配的结果
- future.complete(syncResponse.memberAssignment());
- }
其实,真正处理结果的地方,是在前面的listener的处理方案中的OnJoinComplete()中,位于ConsumerCoordinator
- protected void onJoinComplete(int generation,
- String memberId,
- String assignmentStrategy,
- ByteBuffer assignmentBuffer) {
- if (!isLeader)
- assignmentSnapshot = null;
- //查找使用的分配策略
- PartitionAssignor assignor = lookupAssignor(assignmentStrategy); //对比策略,本地是否存在
- if (assignor == null)
- throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
- //2.反序列化,更新assignment
- Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
- //将needsFetchCommittedOffsets设置为true,运行从服务端获取最近一次的提交的offset
- subscriptions.needRefreshCommits();
- //填充assignment集合
- subscriptions.assignFromSubscribed(assignment.partitions());
- Set<String> addedTopics = new HashSet<>();
- for (TopicPartition tp : subscriptions.assignedPartitions()) {
- if (!joinedSubscription.contains(tp.topic()))
- addedTopics.add(tp.topic());
- }
-
- if (!addedTopics.isEmpty()) {
- Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
- Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
- newSubscription.addAll(addedTopics);
- newJoinedSubscription.addAll(addedTopics);
-
- this.subscriptions.subscribeFromPattern(newSubscription);
- this.joinedSubscription = newJoinedSubscription;
- }
- this.metadata.setTopics(subscriptions.groupSubscription());
- client.ensureFreshMetadata();
- //回调函数
- assignor.onAssignment(assignment);
-
- this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
-
- ConsumerRebalanceListener listener = subscriptions.listener();
- log.info("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
- try {
- Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
- listener.onPartitionsAssigned(assigned);
- } catch (WakeupException | InterruptException e) {
- throw e;
- } catch (Exception e) {
- log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。