赞
踩
客户端版本gradle引用:compile "org.apache.kafka:kafka-clients:0.10.2.1"
org.apache.kafka.clients:producer和consumer相关类
org.apache.kafka.common:公共模块
org.apache.kafka.server.policy :只有一个类,创建一个topic强制指定的策略类 .
org.apache.kafka:kafka-clients:0.10.21==>>org.apache.kafka.clients.producer.KafkaProducer.send()//359行
1.拦截器过滤消息
org.apache.kafka.clients.producer.internals.ProducerInterceptor.onSend()//57行
2.first make sure the metadata for the topic is available,同步集群元数据
3.序列化 key和value,序列化方法和编码都可以配置
4.获取分区
org.apache.kafka:kafka-clients:0.10.21==>>org.apache.kafka.clients.producer.KafkaProducer.partition()//470行
指定分区,直接返回
未指定分区,指定分区序列化key:Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
未指定分区,未指定分区序列化key
轮询分区数字,然后通过可达分区(集群可达分区数,可变)取余数:Utils.toPositive(nextValue) % availablePartitions.size()
没有可达分区:Utils.toPositive(nextValue) % numPartitions
5.alidate that the record size is not too large
6.将记录放到消息记录收集器里
org.apache.kafka.clients.producer.internals.RecordAccumulator.append()//160行
7.如果改批次已经满了,新建一个批次然后将记录追加到批次里;如果一个批次没有,新建一个批次然后将记录追加到批次里
if (result.batchIsFull || result.newBatchCreated) {
//唤醒发送线程
this.sender.wakeup();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。