当前位置:   article > 正文

kafka学习(3)-Group 、分区、多线程消费_kafka group

kafka group

1、分区、Group原理图

                      

2、原理描述

一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。Zookeerper中保存这每个topic下的每个partition在每个group中消费的offset 。
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下 
这个__consumer_offsert 有50个分区,通过将group的id哈希值%50的值来确定要保存到那一个分区. 这样也是为了考虑到zookeeper不擅长大量读写的原因。 
所以,如果要一个group用几个consumer来同时读取的话,需要多线程来读取,一个线程相当于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。 
假设一个topic test 被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据 

3、消费者群组-Group

消费者可以通过使用samegroup.id加入一个组。

  • 一个组的最大并行度是该组中的消费者的数量 ← 分区的数量。
  • Kafka将一个主题的分区分配给组中的使用者,以便每个分区仅由组中的一位消费者使用(如果想要重复消费,那么需要其他的组来消费)。
  • Kafka保证只有群组中的单个消费者阅读消息。
  • 消费者可以按照存储在日志中的顺序查看消息。

4、重复消费

问题:

kafka消费者使用自动提交的模式,提交间隔为2s,消费者在获取数据的时候处理0.5s,从kafka拉取过来的数据只够处理1秒。那么消费者下次拉取过来的数据是否是已经消费完的数据?或者说由于数据已经消费,但是偏移量没有被提交,是否会造成下次获取的数据是从旧的偏移量开始拉取?

 

 

 

回答:

不会是旧数据,kafka的消费者也有自己偏移量,这个偏移量是从kafka中读取的量,和kafka提交的偏移量不一样。假设变成自动提交偏移量,而且没有写提交的逻辑,同一个消费者,除了第一次或者rebalance会根据已提交的offset来获取数据,剩下的时候都是根据自己本地的偏移量来获取的。这个模式有点类似于用桶取水,用瓢来喝水。消费者就是桶的角色,poll就是瓢的角色。

 

 

 

重复消费的情况

我们把重复消费的情况分为2种,一种是想避免的,一种是故意如此的。

想避免的场景

  1. 消费者使用了自动提交模式,当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
  2. 使用异步提交,并且在callback里写了失败重试,但是没有注意顺序。例如提交5的时候,发送网络故障,由于是异步,程序继续运行,再次提交10的时候,提交成功,此时正好运行到5的重试,并且成功。当发生了rebalance,又会重复消费了数据。

注:这里不讨论那个消费者提交的offset的作用。

故意的场景

  1. 使用不同的组消费同一个topic。改个 group.id属性即可。
  2. 自己手动提交偏移量。
    这里的麻烦的地方就是需要理解开头的问题,并不是说你提交完就可以了。你得想个办法去读取那个偏移量再次消费。下面提供一个暴力的手段,关闭消费者,然后再次开启新的。

 

  1. public static void consumer(Properties properties,String info) {
  2. System.out.println(info);
  3. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
  4. kafkaConsumer.subscribe(Arrays.asList(new String[]{"hello"}));
  5. boolean flag = false;
  6. while (true) {
  7. ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
  8. if (!poll.isEmpty()) {
  9. for (ConsumerRecord<String, String> o : poll) {
  10. System.out.println(o.value() + o.offset());
  11. //假设场景为重复消费3,这里需要根据业务来提交便宜量
  12. if (o.offset() == 3) {
  13. //手动提交偏移量
  14. Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
  15. //提交的偏移量,这个偏移量就是下次消费的第一条数据
  16. currentOffset.put(new TopicPartition(o.topic(), o.partition()), new OffsetAndMetadata(o.offset()+1, ""));
  17. kafkaConsumer.commitSync(currentOffset);
  18. flag = true;
  19. break;
  20. }
  21. }
  22. }
  23. if(flag){
  24. kafkaConsumer.close();
  25. break;
  26. }
  27. }
  28. }

这里也必须注意,kafka并不是数据库,他保存的数据有持久化的时间和大小的限制,可能你提交的偏移量的数据已经被kafka清理掉了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/125402?site
推荐阅读
相关标签
  

闽ICP备14008679号