赞
踩
kafka的版本是 2.6.2
一般我们消费kafka的时候是指定消费组,是不会指定消费组内部消费kafka各个分区的分配策略,但是我们也可以指定消费策略,通过源码发现,我们可以有三种分区策略:
指定消费分区策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
kafka消费分区策略的分区入口类是:ConsumerCoordinator
的performAssignment
方法
@Override protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) { //获取分区策略 ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); //存储消费组订阅的所有topic Set<String> allSubscribedTopics = new HashSet<>(); //存储消费组内各个消费者对应的基本信息(比如元数据) Map<String, Subscription> subscriptions = new HashMap<>(); Map<String, List<TopicPartition>> ownedPartitions = new HashMap<>(); for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) { Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata())); subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId())); subscriptions.put(memberSubscription.memberId(), subscription); allSubscribedTopics.addAll(subscription.topics()); ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions()); } //具体实现在类 AbstractPartitionAssignor (各个分区算法的抽象类) Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment(); ... log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments); ... return groupAssignment; }
AbstractPartitionAssignor
的 assign()
//各个分区策略具体的算法 public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions); @Override public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) { Map<String, Subscription> subscriptions = groupSubscription.groupSubscription(); Set<String> allSubscribedTopics = new HashSet<>(); for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) allSubscribedTopics.addAll(subscriptionEntry.getValue().topics()); Map<String, Integer> partitionsPerTopic = new HashMap<>(); for (String topic : allSubscribedTopics) { Integer numPartitions = metadata.partitionCountForTopic(topic); if (numPartitions != null && numPartitions > 0) partitionsPerTopic.put(topic, numPartitions); else log.debug("Skipping assignment for topic {} since no metadata is available", topic); } /构建参数 partitionsPerTopic:map,表示各个topic有多少个分区 //subscriptions :map,表示消费者相关信息(消费者id,消费者对应的主题) Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions); // this class maintains no user data, so just wrap the results Map<String, Assignment> assignments = new HashMap<>(); for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); return new GroupAssignment(assignments); }
下面说明下RangeAssignor
与RoundRobinAssignor
两种分区策略的区别
RangeAssignor是默认分配的策略
public class RangeAssignor extends AbstractPartitionAssignor { @Override public String name() { return "range"; } private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) { Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>(); for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); for (String topic : subscriptionEntry.getValue().topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //获取主题对应的消费者列表 //partitionsPerTopic 主题对应分区个数 //subscriptions 消费者的信息(消费者id,消费者对应的主题,消费者实例) Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions); //打印输出,可以看到消费组group-one有两个消费者 consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 和 consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd //其中: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd 消费了 test_topic_partition_one 和 test_topic_partition_two // consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 只消费了 test_topic_partition_one // consumersPerTopic: {test_topic_partition_one=[MemberInfo [member.id: consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3, group.instance.id: {}], MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]], test_topic_partition_two=[MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]]} Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) { //获取topic String topic = topicEntry.getKey(); //获取topic对应的消费者 List<MemberInfo> consumersForTopic = topicEntry.getValue(); //获取topic的分区数 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //计算每个消费者至少消费几个分区 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //计算剩余几个分区 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //获取主题分区列表 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); //可以看到前面的消费者会多分配一个分区 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); //计算每个消费者对应的分区列表,可以看到前面的消费者会多分配一个分区 assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length)); } } return assignment; } }
举例说明:构建消费组下两个消费者, test_topic_partition_one
和test_topic_partition_two
都是9个分区
进程一:
props.put("group.id", "group-one");
props.put("auto.offset.reset", "latest");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));
进程二:
props.put("group.id", "group-one");
props.put("auto.offset.reset", "latest");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic_partition_one"));
通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd
消费的分区为:
test_topic_partition_one-0,
test_topic_partition_one-1,
test_topic_partition_one-2,
test_topic_partition_one-3,
test_topic_partition_one-4,
test_topic_partition_two-0,
test_topic_partition_two-1,
test_topic_partition_two-2,
test_topic_partition_two-3,
test_topic_partition_two-4,
test_topic_partition_two-5,
test_topic_partition_two-6,
test_topic_partition_two-7,
test_topic_partition_two-8
消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3
消费的分区为:
test_topic_partition_one-5,
test_topic_partition_one-6,
test_topic_partition_one-7,
test_topic_partition_one-8
如果进程二也消费两个主题,则对应的关系变成
通过上面的分配算法可以得到:
消费者:consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd
消费的分区为:
test_topic_partition_one-0,
test_topic_partition_one-1,
test_topic_partition_one-2,
test_topic_partition_one-3,
test_topic_partition_one-4,
test_topic_partition_two-0,
test_topic_partition_two-1,
test_topic_partition_two-2,
test_topic_partition_two-3,
test_topic_partition_two-4,
消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3
消费的分区为:
test_topic_partition_one-5,
test_topic_partition_one-6,
test_topic_partition_one-7,
test_topic_partition_one-8,
test_topic_partition_two-5,
test_topic_partition_two-6,
test_topic_partition_two-7,
test_topic_partition_two-8
可以看到第一个消费者比第二个消费者多消费一个test_topic_partition_one
的分区,而且是连续的。同时可以看到分类是按照topic粒度区分的,也就是每个消费者消费一个topic的分区与其他topic是无关的。可以会导致第一个实例运行压力较大的问题。
public class RoundRobinAssignor extends AbstractPartitionAssignor { @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { Map<String, List<TopicPartition>> assignment = new HashMap<>(); //存储消费组下所有的消费者,构建两个消费者 // 其中一个:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5 // 另一个:consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855a List<MemberInfo> memberInfoList = new ArrayList<>(); for (Map.Entry<String, Subscription> memberSubscription : subscriptions.entrySet()) { assignment.put(memberSubscription.getKey(), new ArrayList<>()); memberInfoList.add(new MemberInfo(memberSubscription.getKey(), memberSubscription.getValue().groupInstanceId())); } //排序后的消费者 CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList)); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); //轮询指定消费者的分区 while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic)) { assigner.next(); } assignment.get(assigner.next().memberId).add(partition); } return assignment; } //获取排序后的所有主题分区 private List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { SortedSet<String> topics = new TreeSet<>(); for (Subscription subscription : subscriptions.values()) topics.addAll(subscription.topics()); List<TopicPartition> allPartitions = new ArrayList<>(); for (String topic : topics) { Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic != null) allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic)); } return allPartitions; } @Override public String name() { return "roundrobin"; } }
举例说明:构建消费组下两个消费者, test_topic_partition_one
和test_topic_partition_two
都是9个分区
进程一:
props.put("group.id", "group-one");
props.put("auto.offset.reset", "latest");
//指定轮询策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic_partition_one", "test_topic_partition_two"));
props.put("group.id", "group-one");
props.put("auto.offset.reset", "latest");
//指定轮询策略
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic_partition_one"));
通过上面的分配算法可以得到:
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5
消费的分区为:
test_topic_partition_one-0,
test_topic_partition_one-2,
test_topic_partition_one-4,
test_topic_partition_one-6,
test_topic_partition_one-8,
test_topic_partition_two-0,
test_topic_partition_two-1,
test_topic_partition_two-2,
test_topic_partition_two-3,
test_topic_partition_two-4,
test_topic_partition_two-5,
test_topic_partition_two-6,
test_topic_partition_two-7,
test_topic_partition_two-8
消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3
消费的分区为:
test_topic_partition_one-1,
test_topic_partition_one-3,
test_topic_partition_one-5,
test_topic_partition_one-7
可以看到test_topic_partition_one
分区是轮流的分配给两个消费者的
对应的日志
2024-08-19 14:28:34 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Line:626] [Consumer clientId=consumer-group-one-1, groupId=group-one] Finished assignment for group at generation 44: {consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5=Assignment(partitions=[test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8]), consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855a=Assignment(partitions=[test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7])}
如果进程二也消费两个主题,则对应的关系变成
消费者:consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5
消费的分区为:
test_topic_partition_one-0,
test_topic_partition_one-2,
test_topic_partition_one-4,
test_topic_partition_one-6,
test_topic_partition_one-8,
test_topic_partition_two-1,
test_topic_partition_two-3,
test_topic_partition_two-5,
test_topic_partition_two-7
消费者:consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3
消费的分区为:
test_topic_partition_one-1,
test_topic_partition_one-3,
test_topic_partition_one-5,
test_topic_partition_one-7
test_topic_partition_two-0,
test_topic_partition_two-2,
test_topic_partition_two-4,
test_topic_partition_two-6,
test_topic_partition_two-8
也就是会把所有的分区轮流分给两个消费者,所以这种模式就和主题个数与主题分区有关了。
既然看到这里了,就给个赞
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。