赞
踩
目录
如果key=null,并且采用默认分区器,就会轮询均匀分布在各个分区
如果key不为null,使用默认分区,会计算散列值,所以同一个key每次都会落到同一个分区上;如果增加了分区,就无法保证落到同一个分区上了
比如电商服务,大城市的业务量明显比中小城市高,可以单独为大城市自定义分区处理
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import org.apache.kafka.common.PartitionInfo;
-
- import java.util.List;
- import java.util.Map;
-
- /**
- * 自定义分区器
- *
- * @author honry.guan
- * @date 2021-05-07 9:21
- */
- public class MyPartitioner implements Partitioner {
- /**
- * 自定义分区方法
- */
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
- //分区数量
- int num = partitionInfos.size();
- //根据value与分区数求余的方式得到分区ID
- return value.hashCode() % num;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> map) {
-
- }
- }

- package cn.enjoyedu.selfpartition;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- /**
- * MyPartitionerProducer
- *
- * @author honry.guan
- * @date 2021-05-07 9:51
- */
- public class MyPartitionerProducer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- //配置连接ip和地址
- properties.put("bootstrap.servers", "127.0.0.1:9092");
- //kafka自带序列化器,可以不用谢全类路径StringSerializer.class也可以,这里作为演示
- properties.put("key.serializer", StringSerializer.class);
- properties.put("value.serializer", StringSerializer.class);
- properties.put("partitioner.class", MyPartitioner.class);
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
- try {
- //使用自定义分区器
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-hello", "name", "tom");
- Future<RecordMetadata> send = producer.send(producerRecord);
- //这里会阻塞,直到发送成功
- RecordMetadata recordMetadata = send.get();
- if (recordMetadata != null) {
- System.out.println("偏移量:" + recordMetadata.offset() + "-" + "分区:" + recordMetadata.partition());
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- } finally {
- //关闭连接
- producer.close();
- }
- }
- }

以下是复制的RoundRobinAssignor对象中的实现方法
- import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.utils.CircularIterator;
- import org.apache.kafka.common.utils.Utils;
-
- import java.util.*;
-
- /**
- * @author: honry.guan
- * @create: 2021-05-07 21:52
- **/
- public class MyCustomerPartitioner extends AbstractPartitionAssignor {
- /**
- *
- * @param partitionsPerTopic 所订阅的每个 topic 与其 partition 数的对应关系
- * @param subscriptions 每个 consumerId 与其所订阅的 topic 列表的关系。
- * @return
- */
- @Override
- public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
-
- Map<String, List<TopicPartition>> assignment = new HashMap<>();
- for (String memberId : subscriptions.keySet())
- assignment.put(memberId, new ArrayList<>());
-
- CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
- for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
- final String topic = partition.topic();
- while (!subscriptions.get(assigner.peek()).topics().contains(topic))
- assigner.next();
- assignment.get(assigner.next()).add(partition);
- }
- return assignment;
- }
- public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
- Map<String, Subscription> subscriptions) {
- SortedSet<String> topics = new TreeSet<>();
- for (Subscription subscription : subscriptions.values())
- topics.addAll(subscription.topics());
-
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (String topic : topics) {
- Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
- if (numPartitionsForTopic != null)
- allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
- }
- return allPartitions;
- }
- @Override
- public String name() {
- return null;
- }
- }

- public class HelloKafkaConsumer {
-
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put("bootstrap.servers","127.0.0.1:9092");
- properties.put("key.deserializer", StringDeserializer.class);
- properties.put("value.deserializer", StringDeserializer.class);
- //使用自定义分区策略
- properties.put("partition.assignment.strategy", MyCustomerPartitioner.class);
-
- //群组
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
- KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
- try {
- //消费者订阅主题(可以多个)
- consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
- while(true){
- //TODO 拉取(新版本)
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
- for(ConsumerRecord<String, String> record:records){
- System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
- record.offset(),record.key(),record.value()));
- //do my work
- //打包任务投入线程池
- }
- }
- } finally {
- consumer.close();
- }
-
- }
-
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。