当前位置:   article > 正文

kafka自定义生产者分区器、自定义消费者分区器_kafka 自定义分区

kafka 自定义分区

目录

1 默认分区

1.1 键key的作用

1.2 键的分区

2 生产者自定义分区

2.1 使用场景分析

2.2 自定义分区器要实现Partitioner接口

2.3 生产者使用分区器

3 消费者自定义分区

3.1 默认的分区策略

3.2 自定义分区策略

3.3 消费者使用自定义策略


1 默认分区

1.1 键key的作用

  1. 决定消息在主题的哪个分区
  2. 作为消息的附加信息

1.2 键的分区

如果key=null,并且采用默认分区器,就会轮询均匀分布在各个分区

如果key不为null,使用默认分区,会计算散列值,所以同一个key每次都会落到同一个分区上;如果增加了分区,就无法保证落到同一个分区上了

2 生产者自定义分区

2.1 使用场景分析

比如电商服务,大城市的业务量明显比中小城市高,可以单独为大城市自定义分区处理

2.2 自定义分区器要实现Partitioner接口

  1. import org.apache.kafka.clients.producer.Partitioner;
  2. import org.apache.kafka.common.Cluster;
  3. import org.apache.kafka.common.PartitionInfo;
  4. import java.util.List;
  5. import java.util.Map;
  6. /**
  7. * 自定义分区器
  8. *
  9. * @author honry.guan
  10. * @date 2021-05-07 9:21
  11. */
  12. public class MyPartitioner implements Partitioner {
  13. /**
  14. * 自定义分区方法
  15. */
  16. @Override
  17. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  18. List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
  19. //分区数量
  20. int num = partitionInfos.size();
  21. //根据value与分区数求余的方式得到分区ID
  22. return value.hashCode() % num;
  23. }
  24. @Override
  25. public void close() {
  26. }
  27. @Override
  28. public void configure(Map<String, ?> map) {
  29. }
  30. }

2.3 生产者使用分区器

  1. package cn.enjoyedu.selfpartition;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.Future;
  9. /**
  10. * MyPartitionerProducer
  11. *
  12. * @author honry.guan
  13. * @date 2021-05-07 9:51
  14. */
  15. public class MyPartitionerProducer {
  16. public static void main(String[] args) {
  17. Properties properties = new Properties();
  18. //配置连接ip和地址
  19. properties.put("bootstrap.servers", "127.0.0.1:9092");
  20. //kafka自带序列化器,可以不用谢全类路径StringSerializer.class也可以,这里作为演示
  21. properties.put("key.serializer", StringSerializer.class);
  22. properties.put("value.serializer", StringSerializer.class);
  23. properties.put("partitioner.class", MyPartitioner.class);
  24. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  25. try {
  26. //使用自定义分区器
  27. ProducerRecord<String, String> producerRecord = new ProducerRecord<>("my-hello", "name", "tom");
  28. Future<RecordMetadata> send = producer.send(producerRecord);
  29. //这里会阻塞,直到发送成功
  30. RecordMetadata recordMetadata = send.get();
  31. if (recordMetadata != null) {
  32. System.out.println("偏移量:" + recordMetadata.offset() + "-" + "分区:" + recordMetadata.partition());
  33. }
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. } catch (ExecutionException e) {
  37. e.printStackTrace();
  38. } finally {
  39. //关闭连接
  40. producer.close();
  41. }
  42. }
  43. }

3 消费者自定义分区

3.1 默认的分区策略

partition.assignment.strategy 分区分配给消费者的策略。系统提供两种策略。默认为 Range 。允许自定义策略。
分区有1,2,3,4,消费者有a,b
  1. Range 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区) :a管分区1,2,b管分区3,4
  2. RoundRobin 把主题的分区循环分配给消费者:a管分区1,3,b管分区2,4

3.2 自定义分区策略

以下是复制的RoundRobinAssignor对象中的实现方法

  1. import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
  2. import org.apache.kafka.common.TopicPartition;
  3. import org.apache.kafka.common.utils.CircularIterator;
  4. import org.apache.kafka.common.utils.Utils;
  5. import java.util.*;
  6. /**
  7. * @author: honry.guan
  8. * @create: 2021-05-07 21:52
  9. **/
  10. public class MyCustomerPartitioner extends AbstractPartitionAssignor {
  11. /**
  12. *
  13. * @param partitionsPerTopic 所订阅的每个 topic 与其 partition 数的对应关系
  14. * @param subscriptions 每个 consumerId 与其所订阅的 topic 列表的关系。
  15. * @return
  16. */
  17. @Override
  18. public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
  19. Map<String, List<TopicPartition>> assignment = new HashMap<>();
  20. for (String memberId : subscriptions.keySet())
  21. assignment.put(memberId, new ArrayList<>());
  22. CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
  23. for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
  24. final String topic = partition.topic();
  25. while (!subscriptions.get(assigner.peek()).topics().contains(topic))
  26. assigner.next();
  27. assignment.get(assigner.next()).add(partition);
  28. }
  29. return assignment;
  30. }
  31. public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic,
  32. Map<String, Subscription> subscriptions) {
  33. SortedSet<String> topics = new TreeSet<>();
  34. for (Subscription subscription : subscriptions.values())
  35. topics.addAll(subscription.topics());
  36. List<TopicPartition> allPartitions = new ArrayList<>();
  37. for (String topic : topics) {
  38. Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
  39. if (numPartitionsForTopic != null)
  40. allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
  41. }
  42. return allPartitions;
  43. }
  44. @Override
  45. public String name() {
  46. return null;
  47. }
  48. }

3.3 消费者使用自定义策略

 

  1. public class HelloKafkaConsumer {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. properties.put("bootstrap.servers","127.0.0.1:9092");
  5. properties.put("key.deserializer", StringDeserializer.class);
  6. properties.put("value.deserializer", StringDeserializer.class);
  7. //使用自定义分区策略
  8. properties.put("partition.assignment.strategy", MyCustomerPartitioner.class);
  9. //群组
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
  11. KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
  12. try {
  13. //消费者订阅主题(可以多个)
  14. consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
  15. while(true){
  16. //TODO 拉取(新版本)
  17. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
  18. for(ConsumerRecord<String, String> record:records){
  19. System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
  20. record.offset(),record.key(),record.value()));
  21. //do my work
  22. //打包任务投入线程池
  23. }
  24. }
  25. } finally {
  26. consumer.close();
  27. }
  28. }
  29. }

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/700566
推荐阅读
相关标签
  

闽ICP备14008679号