当前位置:   article > 正文

kafka源码---消费者(2)_disableheartbeat

disableheartbeat

继续接着上一下消费者的知识点,本节我们将追踪消费者中最重要的rebalance代码和offset提交代码!

一,rebalance

首先,我们看看哪些情况会触发rebalance:

1.有新的consumer 加入

2.有consumer宕机,可能网络延迟,规定时间内没有进行heartbean

3.有消费者主动退出 consumer gorup

4.conmuser group订阅的topic出现 partition数量变化

5.消费者调用unsubscrible() 取消对topic的订阅

我们把rebalance整体分为三个阶段,分别寻找GroupCoordinator,当一个consumer上线的时候,不知道我们订阅的topic,在哪个node上面。第二阶段:进行join Group,发送JoinGroupRequest请求,处理response。第三阶段:进入Synchronizing Group State阶段,

1.1 第一阶段

    找到GroupCoordinator,我们随便向集群中的任意node,发送一个GroupCoordinatorRequest请求,在该请求里面就只带了groupId这一个变量,集群收到之后,返会GroupCoordinatorResponse响应,包含了coordinator节点的id,host,port三个值,还有错误码(便于查找问题,short类型)

注意poll()是整个rebalance流程都会走的主方法,它位于ConsumerCoordinator类

  1. public void poll(long now, long remainingMs) {
  2. invokeCompletedOffsetCommitCallbacks();
  3. if (subscriptions.partitionsAutoAssigned()) {
  4. if (coordinatorUnknown()) {
  5. ensureCoordinatorReady();//第一阶段
  6. now = time.milliseconds();
  7. }
  8. ....
  9. }

我们首先看一下if中的判断方法,如果coordinator不存在,那么我们就需要通过GroupCoodinatorRequest请求找到coordinator。

  1. public boolean coordinatorUnknown() {
  2. return coordinator() == null;
  3. }

现在步入正题,发送request,处理response,这里的方法,只是发送request的入口,真正的send()还需要深入

它位于AbstractCoordinator类中,也就是ConsumerCoordinator的父类

-----    request创建与发送  -----------

  1. protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
  2. long remainingMs = timeoutMs;
  3. while (coordinatorUnknown()) {
  4. RequestFuture<Void> future = lookupCoordinator(); //发送【入】
  5. client.poll(future, remainingMs); //等待结果
  6. if (future.failed()) {
  7. if (future.isRetriable()) {
  8. remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
  9. if (remainingMs <= 0)
  10. break;
  11. client.awaitMetadataUpdate(remainingMs); //阻塞更新metadata中记录的集群元数据,重新执行
  12. } else
  13. throw future.exception();
  14. } else if (coordinator != null && client.connectionFailed(coordinator)) {
  15. //找到了,但是网络连接失败,清空unsent,重新发送
  16. coordinatorDead();
  17. time.sleep(retryBackoffMs);
  18. }
  19. remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
  20. if (remainingMs <= 0)
  21. break;
  22. }
  23. return !coordinatorUnknown();
  24. }

我们要了解GroupDoordinatorRequest是如何产生的,所以我们进入lookupCoordinator()方法中:

  1. protected synchronized RequestFuture<Void> lookupCoordinator() {
  2. if (findCoordinatorFuture == null) {
  3. // find a node to ask about the coordinator
  4. Node node = this.client.leastLoadedNode(); //找一个负载最小的node 发送data
  5. if (node == null) {
  6. log.debug("No broker available to send GroupCoordinator request");
  7. return RequestFuture.noBrokersAvailable();
  8. } else
  9. findCoordinatorFuture = sendGroupCoordinatorRequest(node);//【入】
  10. }
  11. return findCoordinatorFuture;
  12. }

它的主要功能是,找一个负载最小的node节点,用来发送信息,然后再深入:

  1. private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
  2. // initiate the group metadata request
  3. log.debug("Sending GroupCoordinator request to broker {}", node);
  4. FindCoordinatorRequest.Builder requestBuilder =
  5. new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
  6. return client.send(node, requestBuilder)
  7. .compose(new GroupCoordinatorResponseHandler()); //发送
  8. }

可以看到,我们找到了CoordinatorRequest的构建了,直接new出来的,参数也很简单,就是一个grooupId,这个Builder类是FindCoorDinatorRequest的内部类,这样设计的原因是什么?

  1. public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
  2. private final String coordinatorKey; //groupId
  3. private final CoordinatorType coordinatorType; //Group
  4. private final short minVersion;
  5. ...
  6. }

最后,使用client发送:注意这里的CompletionHandler,处理器,它也是内部类。

  1. public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
  2. long now = time.milliseconds();
  3. RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
  4. ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
  5. completionHandler);
  6. unsent.put(node, clientRequest);
  7. // wakeup the client in case it is blocking in poll so that we can send the queued request
  8. client.wakeup();
  9. return completionHandler.future;
  10. }

---------   response 处理 ---------------

我们分析一下回调过程:

首先我们在send()里面返回的是一个future,这个future是什么?

是一个ClinetResponse,我们还有对应的ClientRequest。我们再看一下ClientResponse字段:

ClientResponse类

  1. public class ClientResponse {
  2. private final RequestHeader requestHeader;
  3. private final RequestCompletionHandler callback; //★
  4. private final String destination;
  5. private final long receivedTimeMs;
  6. private final long latencyMs;
  7. private final boolean disconnected;
  8. private final UnsupportedVersionException versionMismatch;
  9. private final AbstractResponse responseBody;
  10. ...
  11. }

注意★这个字段,我们回忆一下,调用send()方法的时候,后面还有一个尾巴:

这里就对应上了,所以我们查看这个responseHandler是如何处理的:

  1. public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
  2. ...
  3. if (error == Errors.NONE) { //无错误
  4. synchronized (AbstractCoordinator.this) { //给coordinator赋值
  5. AbstractCoordinator.this.coordinator = new Node(
  6. Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
  7. findCoordinatorResponse.node().host(),
  8. findCoordinatorResponse.node().port());
  9. log.info("Discovered coordinator {}", coordinator);
  10. client.tryConnect(coordinator);
  11. heartbeat.resetTimeouts(time.milliseconds());
  12. }
  13. future.complete(null);
  14. } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
  15. future.raise(new GroupAuthorizationException(groupId));
  16. } else {
  17. log.debug("Group coordinator lookup failed: {}", error.message());
  18. future.raise(error);
  19. }
  20. }

好了,我们到这里就成功的把coordinator初始化了!

1.2 第二阶段

成功找到GroupCoordinator之后,进入第二阶段,消费者向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

入口同样在ConsumerCoordinator类中的poll()方法中

  1. public void poll(long now, long remainingMs) {
  2. ...
  3. if (needRejoin()) {
  4. if (subscriptions.hasPatternSubscription())
  5. client.ensureFreshMetadata();
  6. ensureActiveGroup(); //【入】第二阶段
  7. now = time.milliseconds();
  8. }
  9. ...
  10. }

调用方法判断是否需要join()

  1. public boolean needRejoin() {
  2. if (!subscriptions.partitionsAutoAssigned()) //是否为自动分区分配订阅模式
  3. return false;
  4. // we need to rejoin if we performed the assignment and metadata has changed
  5. if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
  6. return true;
  7. // we need to join if our subscription has changed since the last join
  8. if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
  9. return true;
  10. return super.needRejoin();
  11. }

好了,现在我们需要rejoin(),并且GroupCoordinator已经找到了,继续跟踪

来到AbstractCoordinator类中,ConsumerCoordinator的父类

  1. public void ensureActiveGroup() {
  2. ensureCoordinatorReady(); //保证 coordinator已经找到
  3. startHeartbeatThreadIfNeeded(); //心跳检测 开启
  4. joinGroupIfNeeded(); //是否需要join
  5. }

看一下第二个方法,开启心跳检测,内容简单。

  1. private synchronized void startHeartbeatThreadIfNeeded() {
  2. if (heartbeatThread == null) {
  3. heartbeatThread = new HeartbeatThread();
  4. heartbeatThread.start();
  5. }
  6. }

现在进入正题:joinGroupIfNeeded()方法:

  1. void joinGroupIfNeeded() {
  2. while (needRejoin() || rejoinIncomplete()) {
  3. ensureCoordinatorReady(); //确保 找到了groupCoordinator
  4. if (needsJoinPrepare) {
  5. onJoinPrepare(generation.generationId, generation.memberId); //1.预处理
  6. needsJoinPrepare = false;
  7. }
  8. RequestFuture<ByteBuffer> future = initiateJoinGroup(); //2.【入】
  9. client.poll(future); //阻塞等待完成请求
  10. if (future.succeeded()) {
  11. onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
  12. resetJoinGroupFuture();
  13. needsJoinPrepare = true;
  14. } else {
  15. resetJoinGroupFuture();
  16. RuntimeException exception = future.exception();
  17. if (exception instanceof UnknownMemberIdException ||
  18. exception instanceof RebalanceInProgressException ||
  19. exception instanceof IllegalGenerationException)
  20. continue;
  21. else if (!future.isRetriable())
  22. throw exception;
  23. time.sleep(retryBackoffMs);
  24. }
  25. }
  26. }

看第一个1.预处理 onJoinPrepare()方法,

  1. protected void onJoinPrepare(int generation, String memberId) {
  2. // commit offsets prior to rebalance if auto-commit enabled
  3. maybeAutoCommitOffsetsSync(rebalanceTimeoutMs); //进行一次同步offset操作
  4. // execute the user's callback before rebalance
  5. ConsumerRebalanceListener listener = subscriptions.listener();
  6. log.info("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
  7. try {
  8. Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
  9. listener.onPartitionsRevoked(revoked);
  10. } catch (WakeupException | InterruptException e) {
  11. throw e;
  12. } catch (Exception e) {
  13. log.error("User provided listener {} failed on partition revocation", listener.getClass().getName(), e);
  14. }
  15. isLeader = false;
  16. subscriptions.resetGroupSubscription();
  17. }

第二个方法,-----    request创建与发送  -----------

  1. private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
  2. if (joinFuture == null) {
  3. disableHeartbeatThread(); //中断 心跳检测
  4. state = MemberState.REBALANCING;
  5. joinFuture = sendJoinGroupRequest(); //【入】 发送
  6. joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
  7. @Override
  8. public void onSuccess(ByteBuffer value) {
  9. synchronized (AbstractCoordinator.this) {
  10. log.info("Successfully joined group with generation {}", generation.generationId);
  11. state = MemberState.STABLE;
  12. rejoinNeeded = false;
  13. if (heartbeatThread != null)
  14. heartbeatThread.enable();
  15. }
  16. }
  17. @Override
  18. public void onFailure(RuntimeException e) {
  19. synchronized (AbstractCoordinator.this) {
  20. state = MemberState.UNJOINED;
  21. }
  22. }
  23. });
  24. }
  25. return joinFuture;
  26. }

继续深入,创建request

  1. private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
  2. if (coordinatorUnknown())
  3. return RequestFuture.coordinatorNotAvailable();
  4. JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
  5. groupId,
  6. this.sessionTimeoutMs,
  7. this.generation.memberId,
  8. protocolType(),
  9. metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
  10. return client.send(coordinator, requestBuilder)
  11. .compose(new JoinGroupResponseHandler());
  12. }

到这里,套路就和基本的发送request讨论一样了,绑定一个回调方法,处理response。

request字段

  1. private final String groupId;
  2. private final int sessionTimeout; //超过该时间没有收到心跳,则任务consumer下线
  3. private final int rebalanceTimeout;
  4. private final String memberId; //GroupCoordinator分配给消费者的ID
  5. private final String protocolType; //实现的协议,默认是“consumer”
  6. private final List<ProtocolMetadata> groupProtocols; //针对不同的PartitionAssignor,序列化后的消费者的订阅信息

---------   response 处理 ---------------

我们这里直接跳转道response的处理,位于JoinGroupResponseHandler类中

  1. if (error == Errors.NONE) {
  2. log.debug("Received successful JoinGroup response: {}", joinResponse);
  3. sensors.joinLatency.record(response.requestLatencyMs());
  4. synchronized (AbstractCoordinator.this) {
  5. if (state != MemberState.REBALANCING) {
  6. // if the consumer was woken up before a rebalance completes, we may have already left
  7. // the group. In this case, we do not want to continue with the sync group.
  8. future.raise(new UnjoinedGroupException());
  9. } else { //走这里
  10. AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
  11. joinResponse.memberId(), joinResponse.groupProtocol());
  12. if (joinResponse.isLeader()) { //如果是leader消费者
  13. onJoinLeader(joinResponse).chain(future);
  14. } else { //跟随者
  15. onJoinFollower().chain(future);
  16. }
  17. }
  18. }
  19. }

response字段:

  1. private final int throttleTimeMs;
  2. private final Errors error;
  3. private final int generationId; //GroupCoordinator分配的年代信息
  4. private final String groupProtocol; //GroupCoordinator选择的Assignor
  5. private final String memberId; // GroupCoordinator分配给消费者的Id
  6. private final String leaderId; //leader的memberId
  7. private final Map<String, ByteBuffer> members; //对应消费者的订阅信息

这里就重点了。返回结果中,如果consumer被定位为leader,则要执行分区分配,并且把分区分配的结果,返回给server,而follower则要再发送request,获得分配分区后的结果。

1.首先看一下follower(),这个简单一点

  1. private RequestFuture<ByteBuffer> onJoinFollower() {
  2. SyncGroupRequest.Builder requestBuilder =
  3. new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
  4. Collections.<String, ByteBuffer>emptyMap());
  5. return sendSyncGroupRequest(requestBuilder);
  6. }

2. onJoinLeader()

它与follower()方法差不多,只不过多了一个分区分配的结果放到request中,而follower的request放入的是一个空集合。

  1. private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
  2. try {
  3. Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
  4. joinResponse.members());
  5. SyncGroupRequest.Builder requestBuilder =
  6. new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
  7. return sendSyncGroupRequest(requestBuilder);
  8. }

具体的分区分配,我们在消费者(1)中讲解了,主要就是调用AbstractPartitionAssignor的assign方法,然后调用对应策略的assign()方法,然后获得最后分配的结果。

这里我们按栈的方式慢慢的返回结果:

得到future之后,第一个 (这些之前都出场过的)

  1. public void onSuccess(ByteBuffer value) {
  2. synchronized (AbstractCoordinator.this) {
  3. state = MemberState.STABLE;
  4. rejoinNeeded = false;
  5. if (heartbeatThread != null)
  6. heartbeatThread.enable();
  7. }
  8. }
  1. if (future.succeeded()) {
  2. onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
  3. resetJoinGroupFuture();
  4. needsJoinPrepare = true;
  5. }

这个OnJoinComplete()方法,也是很重要的,进入第三阶段的时候,讲解

1.3 第三阶段

  完成分区分配之后,进入Synchronizing Group state阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。

  我们知道,在分区分配完成之后,leader和follower都会发送这个请求,逻辑简单,我们这里主要是看它是如何处理response的,进入handler代码:

  1. private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
  2. @Override
  3. public void handle(SyncGroupResponse syncResponse,
  4. RequestFuture<ByteBuffer> future) {
  5. Errors error = syncResponse.error();
  6. if (error == Errors.NONE) {
  7. sensors.syncLatency.record(response.requestLatencyMs());
  8. //传播分区分配的结果
  9. future.complete(syncResponse.memberAssignment());
  10. }

其实,真正处理结果的地方,是在前面的listener的处理方案中的OnJoinComplete()中,位于ConsumerCoordinator

  1. protected void onJoinComplete(int generation,
  2. String memberId,
  3. String assignmentStrategy,
  4. ByteBuffer assignmentBuffer) {
  5. if (!isLeader)
  6. assignmentSnapshot = null;
  7. //查找使用的分配策略
  8. PartitionAssignor assignor = lookupAssignor(assignmentStrategy); //对比策略,本地是否存在
  9. if (assignor == null)
  10. throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
  11. //2.反序列化,更新assignment
  12. Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
  13. //将needsFetchCommittedOffsets设置为true,运行从服务端获取最近一次的提交的offset
  14. subscriptions.needRefreshCommits();
  15. //填充assignment集合
  16. subscriptions.assignFromSubscribed(assignment.partitions());
  17. Set<String> addedTopics = new HashSet<>();
  18. for (TopicPartition tp : subscriptions.assignedPartitions()) {
  19. if (!joinedSubscription.contains(tp.topic()))
  20. addedTopics.add(tp.topic());
  21. }
  22. if (!addedTopics.isEmpty()) {
  23. Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
  24. Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
  25. newSubscription.addAll(addedTopics);
  26. newJoinedSubscription.addAll(addedTopics);
  27. this.subscriptions.subscribeFromPattern(newSubscription);
  28. this.joinedSubscription = newJoinedSubscription;
  29. }
  30. this.metadata.setTopics(subscriptions.groupSubscription());
  31. client.ensureFreshMetadata();
  32. //回调函数
  33. assignor.onAssignment(assignment);
  34. this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
  35. ConsumerRebalanceListener listener = subscriptions.listener();
  36. log.info("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
  37. try {
  38. Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
  39. listener.onPartitionsAssigned(assigned);
  40. } catch (WakeupException | InterruptException e) {
  41. throw e;
  42. } catch (Exception e) {
  43. log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
  44. }
  45. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/125512
推荐阅读
相关标签
  

闽ICP备14008679号