当前位置:   article > 正文

kafka:生产者发送消息的分区策略_kafka如何确定下一条消息发到哪个partition

kafka如何确定下一条消息发到哪个partition

Kafka为了增加系统的伸缩性(Scalability),引入了分区(Partitioning)的概念。

        Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

        通过这个设计,就可以以分区这个粒度进行数据读写操作,每个Broker的各个分区独立处理请求,进而实现负载均衡,提升了整体系统的吞吐量。

        分区策略决定生产者将消息发送到哪个分区的算法

1、默认的分区器

        kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略。

kafka默认的分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner网址:yii666.com

使用默认分区器,生产者创建消息时,根据 参数决定发送到哪个分区:

1.1、黏性分区策略(2.4.0之前是轮询)- 未指定分区、key

kafka学习(四):生产者发送消息的分区策略文章地址https://www.yii666.com/blog/358380.html

        既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直 使用该分区,待该分区的batch已满或者已完成,Kafka再随机选一个分区进行使用(和上一次的分区不同)。 

        Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区

原因:

        kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本中轮询方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。文章来源地址https://www.yii666.com/blog/358380.html网址:yii666.com<

例如:

        第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进 行使用(如果还是0会继续随机)。

1.2、hash分区策略

kafka学习(四):生产者发送消息的分区策略

          没有指明partition值,但有key的情况下,将keyhash值与topic的 partition数进行取余得到partition值。 Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

        注意: 如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定值,这种 hash 取模就没有意义。
        例如:

 key1hash=5, key2hash=6 ,topicpartition=2,那 么key1** 对应的value1写入1号分区,key2对应的value2写入0号分区。

1.3、指定partition策略

        以上两种构造都会通过DefaultPartitioner进行数据分发操作。但指定分区后,不会调用DefaultPartitioner.partition() 方法。

        kafka学习(四):生产者发送消息的分区策略

        指明partition的情况下,直接将指明的值作为partition值; 例如partition=0,所有数据写入分区0。文章来源地址:https://www.yii666.com/blog/358380.html

2、自定义分区策略

         自定义分区策略 跟DefaultPartitioner实现方式一样。

kafka学习(四):生产者发送消息的分区策略

1、创建一个类,实现Partitioner接口。

2、重写 partitioner中的方法,

        partitioner()方法的参数说明:

                参数1:topic       

                参数2:key值

                参数3:key值字节数组

                参数4:value数据

                参数5:value数据的字节数组

                参数6:集群对象

3、在 partitioner() 方法中编写自定义分区逻辑,返回分区编号。

4、在生产者配置信息中进行配置自定义分区:

spring.kafka.producer.properties.partitioner.class=配置类全路径

代码示例:

  1. @Component
  2. public class MyPartitioner implements Partitioner {
  3. @Override
  4. public int partition(String topic, Object key, byte[] keyBytes,
  5. Object value, byte[] valueBytes, Cluster cluster) {
  6. String msgValues = value.toString();
  7. int partition;
  8. if (msgValues.contains("test")){
  9. partition = 0;
  10. }else {
  11. partition = 1;
  12. }
  13. return partition;
  14. }
  15. @Override
  16. public void close() {
  17. //Nothing to close
  18. }
  19. @Override
  20. public void configure(Map<String, ?> configs) {
  21. }
  22. }
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号