当前位置:   article > 正文

8、关于Coordinator

joingroup failed: this is not the correct coordinator. marking coordinator u

参考链接:https://www.jianshu.com/p/f01f5f0309a9

一、旧版本Scala消费者客户端的缺陷

在kafka0.9及以前版本的consumer会在zookeeper上/consumers/groupId/ids、/consumers/groupId/topics、/consumers/groupId/owners下注册watch。一旦有变化,所有的consumer都得到通知,都进行rebanlace操作。

这种方式有几种缺陷:

  1. 1、zk压力很大
  2. 2、羊群效应,就是大量watch需要通知,可能会导致其他任务阻塞
  3. 3、脑裂效应,所有consumer都接收到通知,进行rebanlance,相互之间没法控制。

所以在kafka0.9版本引入了协调器Coordinator

二、协调器分类及功能

每个broker启动的时候都会创建一个GroupCoordinator,每个客户端都有一个ConsumerCoordinator协调器。 ConsumerCoordinator每间隔3S中就会和GroupCoordinator保持心跳,如果超时没有发送,并且再过Zookeeper.time.out = 10S中则会触发rebanlance

1、GroupCoordinator
  1. 1、接受ConsumerCoordinator的JoinGroupRequest请求
  2. 2、选择一个consumer作为leader,一般第一个会作为leader
  3. 3、和consumer保持心跳,默认3S中,参数是:heartbeat.interval.ms
  4. 4、将consumer commit过来的offset保存在__consumer_offsets中
  5. 这是内部topic,还有一个是:__transaction_state
  6. 这个topic默认50个分区,1个副本。副本数一般要修改为3,增加高可用性
2、ConsumerCoordinator
  1. 1、向GroupCoordinator发送JoinGroupRequest请求加入group
  2. 2、定时发送心跳给GroupCoordinator,默认是3s钟
  3. 3、发送SyncGroup请求(携带自己的分配策略),得到自己所分配到的partition
  4. 3、向GroupCoordinator定时发送offset偏移量,默认5Scommit一次
  5. 4、如果是leader Consumer,还要负责分区分配工作,两种分配算法:range和RoundRobin。
3、consumer如何找到GroupCoordinator在哪里?

  1. 1、consumer向任何一个broker发送groupCoordinator发现请求,并携带上groupID
  2. 2、broker通过groupId.hash % __consumer_offsets的partitions数量(默认50个分区)得到分区id、
  3. 此分区leader所在broker就是此groupCoordinator所在broker。
  4. 3、consumerCoordinator找到GroupCoordinator之后,发送JoinGroupRequest请求加入group
  5. 3、GroupCoordinator调用handleJoinGroup方法处理请求。
4、topic ==> consumeroffsets不可用导致没法找到分区对应leader,也就找不到GroupCoordinator异常

主要是__consumer_offsets默认副本数是1,一旦集群上有broker失效,则此broker上__consumer_offsets对应的分区都没法选择leader,因为没有其他副本,导致就找不到对应的GroupCoordinator异常

  1. util.AlterKafkaConsumerOffset$: 元信息topic:t_swt,partition:1,0
  2. Could not fetch offset for [t_table,23] due to org.apache.kafka.common.errors.NotCoordinatorForGroupException:
  3. This is not the correct coordinator for this group.
5、分区再均衡rebanlance和groupCoordinator重新发现

首先consumer是单线程的,发送joinGroup加入组,成功后发送SyncGroup请求获取自己分配到的partition。最后通过dealyqueue来实现3S钟的定时心跳。

  • 1、rebanlance的条件

    1. 新consumer加入
    2. topic 分区增加
    3. consumer失联

    这里面比较复杂的是consumer失联,consumer挂掉或者consumer负载太高导致他不能正常和groupCoordinator保持3S中心跳,在等待session.time.out=10S还没有心跳,则会触发rebanlance。

    此时其他正常的consumer在heartbeat时就会收到GroupCoordinator的response illegal_generation(非法代),此时consumer就知道groupCoordinator正在进行renalancing,则会重新发送JoinGroup请求加入集群,加入成功后发送SyncGroup获取自己负责的partition信息。

    同时会打印日志:attempt to hearbeat failed since group is rebalancing

    所以:要注意consumer的负载,如果GC太频繁就很容易出现这个问题

  • 2、日志中频繁打印发现Groupcoordinator的分析

    1. AppInfoParser: Kafka version : 0.10.1.2.6.4.0-91
    2. AppInfoParser: Kafka commitId : ba0af6800a08d2f8
    3. AbstractCoordinator: Discovered coordinator kafka-rzx2.bigdata.com:6667 (id: 2147482644 rack: null) for group topic_streaming5.
    4. ConsumerCoordinator: Revoking previously assigned partitions [] for group topic_streaming5
    5. AbstractCoordinator: (Re-)joining group topic_streaming5
    6. AbstractCoordinator: Successfully joined group topic_streaming5 with generation 3
    7. ConsumerCoordinator: Setting newly assigned partitions [wl_002-1, wl_002-0, wl_002-2] for group topic_streaming5
    • 2.1、新的consumer加入group,一般是那种有重试机制的框架,比如

      spark-streaming重新启动task任务加入group

      confluent-connect重新启动task加入group

      新的task顶替失败的task,重新加入group,所以他第一步是先发现group协调器,然后join

    • 2.2、groupCoordinator挂了

      当consumer发送心跳给GroupCoordinator没有响应,则会认为group失败,此时则会重新查找groupCoordinator,然后join。

      GroupCoordinator失效是一个很严重的问题,说明某个GroupID对应的consumer非常多,导致太繁忙没法及时反应,或者集群问题导致__consumer_offsets的分区leader重新选举也会有这样的。比如spark-streaming有很多任务,每个任务的groupID都是一样的,每个任务搞了几个executor。这种情况需要拆分为几个groupID,自己负责自己的就行了

      注意:会先打印组协调器失败了coordinator deal, 然后再打印

      1. AppInfoParser: Kafka version : 0.10.1.2.6.4.0-91
      2. AppInfoParser: Kafka commitId : ba0af6800a08d2f8
      3. AbstractCoordinator: Discovered coordinator kafka-rzx2.bigdata.com:6667 (id: 2147482644 rack: null) for group topic_streaming5.
      4. ConsumerCoordinator: Revoking previously assigned partitions [] for group topic_streaming5
      5. AbstractCoordinator: (Re-)joining group topic_streaming5
      6. AbstractCoordinator: Successfully joined group topic_streaming5 with generation 3
      7. ConsumerCoordinator: Setting newly assigned partitions [wl_002-1, wl_002-0, wl_002-2] for group topic_streaming5

三、一些重要日志

  • 1、加入group

  • 2、如果GroupCoordinator正在进行rebanlance,则consumer打印hearbeat failed

  • 3、离开group

  • 4、发现groupCoordinator

    task刚上来的时候会打印,groupCoordinator失败时所有consumer也会重新发现组协调器,同时会打印coordinator deal

  • 5、consumeroffsets分区leader无法找到,一般是由于默认副本为1,broker挂了导致分区leader无法选举,一般集群搭建时就要改为3

    1. util.AlterKafkaConsumerOffset$: 元信息topic:t_swt,partition:1,0
    2. Could not fetch offset for [t_table,23] due to
    3. org.apache.kafka.common.errors.NotCoordinatorForGroupException:
    4. This is not the correct coordinator for this group.

转载于:https://my.oschina.net/liufukin/blog/3100055

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/125500
推荐阅读
相关标签
  

闽ICP备14008679号