赞
踩
kafka send方法小总结
1、kafka的sen默认发送方式应该是同步,而非异步
public ApacheKafkaClient(String kafkaServers, boolean sync) throws Exception {
super(kafkaServers, sync);
}
查看父类VersionKafkaClient
2、kafka调用send方法有3种方式
//单条发送
public void send(String topic, String message) throws Exception {
if (topic != null && message != null) {
ProducerRecord<String, String> record = new ProducerRecord(topic, message);
if (this.sync) {
this.producer.send(record).get();
} else {
this.producer.send(record);
}
}
}
//批量发送
public void send(String topic, Collection messages) throws Exception {
Iterator var3 = messages.iterator();
while(var3.hasNext()) {
String message = (String)var3.next();
this.send(topic, message);
}
}
//
public void sendAllPartition(String topic, String message) throws Exception {
List partitionInfoList = this.producer.partitionsFor(topic);
Iterator var4 = partitionInfoList.iterator();
while(var4.hasNext()) {
PartitionInfo partitionInfo = (PartitionInfo)var4.next();
ProducerRecord<String, String> record = new ProducerRecord(topic, partitionInfo.partition(), "", message);
this.producer.send(record);
}
}
3、此处Properties这个类有意思
protected void init(String kafkaServers, boolean sync) throws Exception {
Properties props = new Properties();
props.put(“bootstrap.servers”, kafkaServers);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
this.producer = new KafkaProducer(props);
}
class Properties extends Hashtable<Object,Object> {}
继承的是Hashtable
深夜看代码 真得劲 越看越兴奋
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。