赞
踩
问题引入:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。Kafka可以同时使用多个分区分配策略
说明:默认策略是Range + CooperativeSticky
package com.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; /** * @author wangbo * @version 1.0 */ /** * 分区策略和再平衡 */ public class CustomConsumer_02 { public static void main(String[] args) { //配置 Properties properties = new Properties(); //连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092"); //多写一个,避免其中一台挂掉,保证数据的可靠性 //反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //配置消费者组ID 可以任意起 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //设置分区策略,一共有四种 Range、RoundRobin、Sticky、CooperativeSticky,默认策略是Range + CooperativeSticky properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RangeAssignor"); //Range分区策略 // properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor"); //RoundRobin分区策略 // properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor"); //Sticky分区策略 // properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); //CooperativeSticky分区策略 //1.创建一个消费者 "","hello" KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); //2.订阅主题 first3 ArrayList<String> topics = new ArrayList<String>(); topics.add("first3"); kafkaConsumer.subscribe(topics); //3.消费数据 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据 //循环打印消费的数据 consumerRecords.for for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
Range是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,而消费者按照字母顺序进行排序。
例如:7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2
通过分区数 / 消费者数来决定每个消费者消费几个分区,如果除不尽,那么前几个消费者将会多消费数据
例如: 7/3 = 2 余 1,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个
缺点:数据量如果非常大,容易造成数据倾斜
例如:如果有 N 多个 topic,那么针对每个topic,消费者 C0都将多消费 1 个分区,topic越多,C0消 费的分区会比其他消费者明显多消费 N 个分区
引入:假如有7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2
C0 号消费者:消费到 0、1、2 号分区数据
C1 号消费者:消费到 3、4 号分区数据
C2 号消费者:消费到 5、6 号分区数据
(1)停止掉C0号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
C0号消费者的任务会整体被分配到C1号消费者或者C2号消费者
说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行
(2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)
C1 号消费者:消费到 0、1、2、3 号分区数据
C2 号消费者:消费到 4、5、6 号分区数据
说明:消费者 C0 已经被踢出消费者组,所以重新按照 Range分配策略进行分配
RoundRobin 针对集群中所有Topic而言,RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者
例如:C0消费0号分区,C1消费1号分区,C2消费2号分区,C0消费3号分区,以此类推
引入:假如有7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2
C0 号消费者:消费到 0、3、6 号分区数据
C1 号消费者:消费到 2、5 号分区数据
C2 号消费者:消费到 4、1 号分区数据
(1)停止掉 C0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
C0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、3 和 6 号分区数据,分别由 C1 号消费者或者 C2 号消费者消费
说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行
(2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)
C1 号消费者:消费到 0、2、4、6 号分区数据
C2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 C0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配
Sticky(粘性)分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销
例如:7个分区,3个消费者,排序后分区为0,1,2,3,4,5,6;消费者排序之后为C0,C1,C2
C0 号消费者:消费到 2、3、5 号分区数据
C1 号消费者:消费到 1、6 号分区数据
C2 号消费者:消费到 0、4 号分区数据
有点类似Range策略,但是分区是随机的,不是按照顺序来的
(1)停止掉 C0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
C1 号消费者:消费到 2、5、3 号分区数据
C2 号消费者:消费到 4、6 号分区数据
C0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 C1 号消费者或者 C2 号消费者消费。
说明:C0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行
(2)停止掉C0号消费者,快速重新发送消息给消费者观看结果(45s 以后)
C1 号消费者:消费到 2、3、5 号分区数据
C2 号消费者:消费到 0、1、4、6 号分区数据
说明:消费者 C0 已经被踢出消费者组,所以重新按照粘性方式分配
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。