赞
踩
在前面我们讲过,KafkaProducer是线程安全的,同时其内部还有一个Sender,开了一个后台线程,不断从队列中取消息进行发送。
而consumer,是一个纯粹的单线程程序,后面所讲的所有机制,包括coordinator,rebalance, heartbeat等,都是在这个单线程的poll函数里面完成的。也因此,在consumer的代码内部,没有锁的出现。
<code class="hljs lasso has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//客户端线程</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span> (<span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">true</span>) { ConsumerRecords<span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;"><</span><span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">String</span>, <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">String</span><span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">></span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">records</span> <span class="hljs-subst" style="color: rgb(0, 0, 0); box-sizing: border-box;">=</span> consumer<span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">.</span>poll(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">100</span>); 。。。 }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li></ul>
在0.9以前的client api中,consumer是要依赖Zookeeper的。因为同一个consumer group中的所有consumer需要进行协同,进行下面所讲的rebalance。
但是因为zookeeper的“herd”与“split brain”,导致一个group里面,不同的consumer拥有了同一个partition,进而会引起消息的消费错乱。为此,在0.9中,不再用zookeeper,而是Kafka集群本身来进行consumer之间的同步。下面引自kafka设计的原文:
<code class="hljs oxygene has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">The current version <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the high level consumer suffers <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">from</span> herd <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> split brain problems, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">where</span> multiple consumers <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> a <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">group</span> run a distributed algorithm <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> agree <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">on</span> the same partition ownership decision. Due <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> different view <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the zookeeper data, they run <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">into</span> conflicts that makes the rebalancing attempt fail. But there <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> no way <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> a consumer <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> verify <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> a rebalancing operation completed successfully <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">on</span> the entire <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">group</span>. This also leads <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> some potential bugs <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> the rebalancing logic, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> example, https:<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//issues.apache.org/jira/browse/KAFKA-242</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
我们知道,对于属于不同consumer group的consumers,可以消费同1个partition,从而实现Pub/Sub模式。
但是在一个group内部,是不允许多个consumer消费同一个partition的,这也就意味着,对于1个topic,1个group来说, 其partition数目 >= consumer个数。
比如对于1个topic,有4个partition,那么在一个group内部,最多只能有4个consumer。你加入更多的consumer,它们也不会分配到partition。
那为什么要做这个限制呢?原因在下面这篇文章中,有详细阐述:
http://stackoverflow.com/questions/25896109/in-apache-kafka-why-cant-there-be-more-consumer-instances-than-partitions
简单来说,一个是因为这样做就没办法保证同1个partition消息的时序;另1方面,Kafka的服务器,是每个topic的每个partition的每个consumer group对应1个offset,即(topic, partition, consumer_group_id) – offset。如果多个consumer并行消费同1个partition,那offset的confirm就会出问题。
知道了这个前提,下面我们就来分析partition的分配问题。
问题的提出:
给定一个topic,有4个partition: p0, p1, p2, p3, 一个group有3个consumer: c0, c1, c2。那么,如果按范围分配策略,分配结果是:
c0: p0, c1: p1, c2: p2, p3
如果按轮询分配策略:
c0: p1, p3, c1: p1, c2: p2
那这整个分配过程是如何进行的呢?见下图所示:
步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> Map<TopicPartition, List<ConsumerRecord<K, V>>> <span class="hljs-title" style="box-sizing: border-box;">pollOnce</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> timeout) { coordinator.ensureCoordinatorKnown(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//poll函数的第1行,就是寻找coordinator。如果没找到,就会一直阻塞在这里</span> ... } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">ensureCoordinatorKnown</span>() { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span> (coordinatorUnknown()) { RequestFuture<Void> future = sendGroupMetadataRequest(); client.poll(future); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (future.failed()) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (future.isRetriable()) client.awaitMetadataUpdate(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throw</span> future.exception(); } } } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> RequestFuture<Void> <span class="hljs-title" style="box-sizing: border-box;">sendGroupMetadataRequest</span>() { Node node = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.client.leastLoadedNode(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (node == <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> RequestFuture.noBrokersAvailable(); } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> { GroupCoordinatorRequest metadataRequest = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> GroupCoordinatorRequest(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.groupId); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//向集群中负载最小的node,发送请求,询问这个group id对应的coordinator是谁</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> RequestFutureAdapter<ClientResponse, Void>() { <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">onSuccess</span>(ClientResponse response, RequestFuture<Void> future) { handleGroupMetadataResponse(response, future); } }); } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li></ul>
步骤2:找到coordinator之后,发送JoinGroup请求
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> Map<TopicPartition, List<ConsumerRecord<K, V>>> <span class="hljs-title" style="box-sizing: border-box;">pollOnce</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> timeout) { coordinator.ensureCoordinatorKnown(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//步骤1:寻找coordinator</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (subscriptions.partitionsAutoAssigned()) coordinator.ensurePartitionAssignment(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//步骤2+3: JoinGroup + SyncGroup</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">ensureActiveGroup</span>() { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (!needRejoin()) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (needsJoinPrepare) { onJoinPrepare(generation, memberId); needsJoinPrepare = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>; } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">while</span> (needRejoin()) { ensureCoordinatorKnown(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (client.pendingRequestCount(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.coordinator) > <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>) { client.awaitPendingRequests(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.coordinator); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">continue</span>; } RequestFuture<ByteBuffer> future = performGroupJoin(); client.poll(future); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (future.succeeded()) { onJoinComplete(generation, memberId, protocol, future.value()); needsJoinPrepare = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>; heartbeatTask.reset(); } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> { RuntimeException exception = future.exception(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (exception <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">instanceof</span> UnknownMemberIdException || exception <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">instanceof</span> RebalanceInProgressException || exception <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">instanceof</span> IllegalGenerationException) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">continue</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (!future.isRetriable()) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throw</span> exception; time.sleep(retryBackoffMs); } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li></ul>
步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition
<code class="hljs axapta has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> RequestFuture<ByteBuffer> performGroupJoin() { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (coordinatorUnknown()) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> RequestFuture.coordinatorNotAvailable(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// send a join group request to the coordinator</span> log.debug(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"(Re-)joining group {}"</span>, groupId); JoinGroupRequest request = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> JoinGroupRequest( groupId, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.sessionTimeoutMs, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.memberId, protocolType(), metadata()); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// create the request for the coordinator</span> log.debug(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Issuing request ({}: {}) to coordinator {}"</span>, ApiKeys.JOIN_GROUP, request, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.coordinator.id()); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">client</span>.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> JoinGroupResponseHandler()); } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">JoinGroupResponseHandler</span> <span class="hljs-inheritance" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">extends</span></span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">CoordinatorResponseHandler</span><<span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">JoinGroupResponse</span>, <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">ByteBuffer</span>> {</span> @Override <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> JoinGroupResponse parse(ClientResponse response) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> JoinGroupResponse(response.responseBody()); } @Override <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// process the response</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">short</span> errorCode = joinResponse.errorCode(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (errorCode == Errors.NONE.code()) { log.debug(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Joined group: {}"</span>, joinResponse.toStruct()); AbstractCoordinator.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.memberId = joinResponse.memberId(); AbstractCoordinator.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.generation = joinResponse.generationId(); AbstractCoordinator.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.rejoinNeeded = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>; AbstractCoordinator.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>.protocol = joinResponse.groupProtocol(); sensors.joinLatency.record(response.requestLatencyMs()); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//关键点:在JoinGroup返回之后,竟跟着发送SyncGroup消息</span> } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> { onJoinFollower().chain(future); } } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { 。。。 } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li></ul>
注意,在上面3步中,有一个关键点: partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。什么意思呢?在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。
然后由这个leader进行partition分配。然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。
为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?关于这个, Kafka的官方文档有详细的分析。其中一个重要原因是为了灵活性:如果让server分配,一旦需要新的分配策略,server集群要重新部署,这对于已经在线上运行的集群来说,代价是很大的;而让client分配,server集群就不需要重新部署了。
所谓rebalance,就是在某些条件下,partition要在consumer中重新分配。那哪些条件下,会触发rebalance呢?
条件1:有新的consumer加入
条件2:旧的consumer挂了
条件3:coordinator挂了,集群选举出新的coordinator
条件4:topic的partition新加
条件5:consumer调用unsubscrible(),取消topic的订阅
当consumers检测到要rebalance时,所有consumer都会重走上面的流程,进行步骤2 + 步骤3: JoinGroup + SyncGroup。
可问题是: 当一个consumer挂了,或者有新的consumer加入,其他consumers怎么知道要进行rebalance呢? 答案就是下面的heartbeat。
每一个consumer都会定期的往coordinator发送heartbeat消息,一旦coordinator返回了某个特定的error code:ILLEGAL_GENERATION, 就说明之前的group无效了(解散了),要重新进行JoinGroup + SyncGroup操作。
那这个定期发送如何实现呢?一个直观的想法就是开一个后台线程,定时发送heartbeat消息,但维护一个后台线程,很显然会增大实现的复杂性。上面也说了, consumer是单线程程序。在这里是通过DelayedQueue来实现的。
其基本思路是把HeartBeatRequest放入一个DelayedQueue中,然后在while循环的poll中,每次从DelayedQueue中把请求拿出来发送出去(只有时间到了,Task才能从Queue中拿出来)。
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">HeartbeatTask</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">implements</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">DelayedTask</span> {</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">boolean</span> requestInFlight = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>; <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//关键变量:判断当前是否有发送出去的HeartBeatRequest,其Response还没回来</span> <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//reset本质就是发送函数</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">reset</span>() { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> now = time.milliseconds(); heartbeat.resetSessionTimeout(now); client.unschedule(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (!requestInFlight) client.schedule(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>, now); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//如果没有requestInFlight,则把HeartBeatTask放入DelayedQueue中</span> } <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">run</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> now) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (generation < <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span> || needRejoin() || coordinatorUnknown()) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>; } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (heartbeat.sessionTimeoutExpired(now)) { coordinatorDead(); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span>; } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (!heartbeat.shouldHeartbeat(now)) { client.schedule(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>, now + heartbeat.timeToNextHeartbeat(now)); } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> { heartbeat.sentHeartbeat(now); requestInFlight = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>; RequestFuture<Void> future = sendHeartbeatRequest(); future.addListener(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> RequestFutureListener<Void>() { <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">onSuccess</span>(Void value) { requestInFlight = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> now = time.milliseconds(); heartbeat.receiveHeartbeat(now); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">long</span> nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//放入delayedQueue</span> client.schedule(HeartbeatTask.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>, nextHeartbeatTime); } <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//hearbeat返回之后,无论response成功,还是失败,把下1个heartbeat放入delayedQueue,从而形成循环间隔发送</span> <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">onFailure</span>(RuntimeException e) { requestInFlight = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>; client.schedule(HeartbeatTask.<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>, time.milliseconds() + retryBackoffMs); } }); } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li><li style="box-sizing: border-box; padding: 0px 5px;">49</li><li style="box-sizing: border-box; padding: 0px 5px;">50</li><li style="box-sizing: border-box; padding: 0px 5px;">51</li><li style="box-sizing: border-box; padding: 0px 5px;">52</li></ul>
首先一点说明:个人认为这里的网络框架,封装的有点冗余:sendHeartbeatRequest既有callback机制(CompleteHandler),又为其Future加了Listener机制(上面的代码)。
也就是在heartbeat的completeHandler中,完成了rebalance的检测:从下面代码可以看出,对于以下的response error code,都会触发rebalance:
- * GROUP_COORDINATOR_NOT_AVAILABLE (15)
- * NOT_COORDINATOR_FOR_GROUP (16)
- * ILLEGAL_GENERATION (22)
- * UNKNOWN_MEMBER_ID (25)
- * REBALANCE_IN_PROGRESS (27)
- * GROUP_AUTHORIZATION_FAILED (30)
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"> public RequestFuture<Void> sendHeartbeatRequest() { HeartbeatRequest req = new HeartbeatRequest(this<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.groupId</span>, this<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.generation</span>, this<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.memberId</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> return client<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.send</span>(coordinator, ApiKeys<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.HEARTBEAT</span>, req) <span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.compose</span>(new HeartbeatCompletionHandler())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { @Override public HeartbeatResponse parse(ClientResponse response) { return new HeartbeatResponse(response<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.responseBody</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.heartbeatLatency</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.record</span>(response<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.requestLatencyMs</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> short errorCode = heartbeatResponse<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.errorCode</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.NONE</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { log<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.debug</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Received successful heartbeat response."</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.complete</span>(null)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.GROUP</span>_COORDINATOR_NOT_AVAILABLE<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>() || errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.NOT</span>_COORDINATOR_FOR_GROUP<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { log<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.info</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> coordinatorDead()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.forCode</span>(errorCode))<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.REBALANCE</span>_IN_PROGRESS<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { log<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.info</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Attempt to heart beat failed since the group is rebalancing, try to re-join group."</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> AbstractCoordinator<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.this</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.rejoinNeeded</span> = true<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.REBALANCE</span>_IN_PROGRESS)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ILLEGAL</span>_GENERATION<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { log<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.info</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Attempt to heart beat failed since generation id is not legal, try to re-join group."</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> AbstractCoordinator<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.this</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.rejoinNeeded</span> = true<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ILLEGAL</span>_GENERATION)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.UNKNOWN</span>_MEMBER_ID<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { log<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.info</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> memberId = JoinGroupRequest<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.UNKNOWN</span>_MEMBER_ID<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> AbstractCoordinator<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.this</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.rejoinNeeded</span> = true<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.UNKNOWN</span>_MEMBER_ID)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else if (errorCode == Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.GROUP</span>_AUTHORIZATION_FAILED<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.code</span>()) { future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(new GroupAuthorizationException(groupId))<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } else { future<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.raise</span>(new KafkaException(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Unexpected errorCode in heartbeat response: "</span> + Errors<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.forCode</span>(errorCode)<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.exception</span>()<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getMessage</span>()))<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li></ul>
关键点:这里所谓的触发,其实就是把rejoinNeeded置为了true。然后在下一次poll循环的时候,检测到rejoinNeeded为true,就会重走上面的步骤2 + 步骤3
对于这整个体系来说,consumer可能挂掉,coordinator也可能挂掉。因此双方需要互相检测,对方是否挂了。
检测方法同样是上面的heartbeat:当consumer发现heartbeat返回超时,或者coordinator很久没有收到heartbeat,就会认为对方挂了。
当然,这会有“误操作”,比如consumer处理消息很慢(因为是单线程),导致下1次heartbeat迟迟没有发出去。这个时候coordinator就会认为该consumer挂了,会主动断开连接。从而触发1次rebalance。
就会从上面的步骤1开始,重新discovery coordinator,然后JoinGroup + SyncGroup
从上面的步骤2开始,通知其他所有剩下的consumers,进行JoinGroup + SyncGroup
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。