赞
踩
Kafka为了增加系统的伸缩性(Scalability),引入了分区(Partitioning)的概念。
Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
通过这个设计,就可以以分区这个粒度进行数据读写操作,每个Broker的各个分区独立处理请求,进而实现负载均衡,提升了整体系统的吞吐量。
分区策略是决定生产者将消息发送到哪个分区的算法。
kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。这个类中就是定义数据分发的策略。
kafka默认的分区器: org.apache.kafka.clients.producer.internals.DefaultPartitioner网址:yii666.com
使用默认分区器,生产者创建消息时,根据 参数决定发送到哪个分区:
文章地址https://www.yii666.com/blog/358380.html
既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直 使用该分区,待该分区的batch已满或者已完成,Kafka再随机选一个分区进行使用(和上一次的分区不同)。
Sticky Partitioning Strategy会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
原因:
kafka 在发送消息的时候 , 采用批处理方案 , 当达到一批后进行分送 , 但是如果一批数据中有不同分区的数据 , 就无法放置到一个批处理中, 而老版本中轮询方案 , 就会导致一批数据被分到多个小的批次中 , 从而影响效率 , 故在新版本中 , 采用这种粘性的划分策略。文章来源地址https://www.yii666.com/blog/358380.html网址:yii666.com<
例如:
第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进 行使用(如果还是0会继续随机)。
没有指明partition值,但有key的情况下,将key的hash值与topic的 partition数进行取余得到partition值。 Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
注意: 如果 key 一直不变,同一个 key 算出来的 hash 值是个固定值。如果是固定值,这种 hash 取模就没有意义。
例如:
key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1** 对应的value1写入1号分区,key2对应的value2写入0号分区。
以上两种构造都会通过DefaultPartitioner进行数据分发操作。但指定分区后,不会调用DefaultPartitioner.partition() 方法。
指明partition的情况下,直接将指明的值作为partition值; 例如partition=0,所有数据写入分区0。文章来源地址:https://www.yii666.com/blog/358380.html
自定义分区策略 跟DefaultPartitioner实现方式一样。
1、创建一个类,实现Partitioner接口。
2、重写 partitioner中的方法,
partitioner()方法的参数说明:
参数1:topic
参数2:key值
参数3:key值字节数组
参数4:value数据
参数5:value数据的字节数组
参数6:集群对象
3、在 partitioner() 方法中编写自定义分区逻辑,返回分区编号。
4、在生产者配置信息中进行配置自定义分区:
spring.kafka.producer.properties.partitioner.class=配置类全路径
代码示例:
- @Component
- public class MyPartitioner implements Partitioner {
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes,
- Object value, byte[] valueBytes, Cluster cluster) {
- String msgValues = value.toString();
- int partition;
- if (msgValues.contains("test")){
- partition = 0;
- }else {
- partition = 1;
- }
- return partition;
- }
- @Override
- public void close() {
- //Nothing to close
- }
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。