赞
踩
上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。
在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):
尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。
假设当前有如下内容:
如果 C1 宕机,此时 StickyAssignor 的结果:
需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:
Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import java.nio.ByteBuffer; import java.util.*; public class WeightedPartitionAssignor implements ConsumerPartitionAssignor { @Override public Subscription subscription(Set<String> topics) { // 在这里添加权重信息到 userData ByteBuffer buffer = ByteBuffer.allocate(4); buffer.putInt(getWeight()); buffer.flip(); return new Subscription(new ArrayList<>(topics), buffer); } @Override public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { Map<String, Assignment> assignments = new HashMap<>(); Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>(); // 遍历所有订阅的topics for (String topic : metadata.topics()) { List<TopicPartition> partitions = metadata.partitionsForTopic(topic); for (TopicPartition partition : partitions) { partitionConsumers.putIfAbsent(partition, new ArrayList<>()); } } // 根据权重分配分区 for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { String consumerId = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); int weight = subscription.userData().getInt(); for (String topic : subscription.topics()) { List<TopicPartition> partitions = metadata.partitionsForTopic(topic); for (TopicPartition partition : partitions) { List<String> consumers = partitionConsumers.get(partition); for (int i = 0; i < weight; i++) { consumers.add(consumerId); // 权重高的消费者多次添加,增加选中的机会 } } } } // 随机分配分区给消费者 Random random = new Random(); for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) { List<String> consumers = entry.getValue(); String assignedConsumer = consumers.get(random.nextInt(consumers.size())); assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())) .partitions().add(entry.getKey()); } return assignments; } @Override public void onAssignment(Assignment assignment, Cluster metadata) { // 可以在这里处理分配后的逻辑,比如保存当前分配的快照 } @Override public String name() { return "weighted"; } private int getWeight() { // 获取权重,可以从配置文件或环境变量中获取 return 10; // 默认权重为10 } }
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。