当前位置:   article > 正文

kafka send方法小总结

kafka send

kafka send方法小总结

1、kafka的sen默认发送方式应该是同步,而非异步
public ApacheKafkaClient(String kafkaServers, boolean sync) throws Exception {
super(kafkaServers, sync);
}
查看父类VersionKafkaClient
在这里插入图片描述

2、kafka调用send方法有3种方式
//单条发送
public void send(String topic, String message) throws Exception {
if (topic != null && message != null) {
ProducerRecord<String, String> record = new ProducerRecord(topic, message);
if (this.sync) {
this.producer.send(record).get();
} else {
this.producer.send(record);
}

}
  • 1

}
//批量发送
public void send(String topic, Collection messages) throws Exception {
Iterator var3 = messages.iterator();

while(var3.hasNext()) {
    String message = (String)var3.next();
    this.send(topic, message);
}
  • 1
  • 2
  • 3
  • 4

}
//
public void sendAllPartition(String topic, String message) throws Exception {
List partitionInfoList = this.producer.partitionsFor(topic);
Iterator var4 = partitionInfoList.iterator();

while(var4.hasNext()) {
    PartitionInfo partitionInfo = (PartitionInfo)var4.next();
    ProducerRecord<String, String> record = new ProducerRecord(topic, partitionInfo.partition(), "", message);
    this.producer.send(record);
}
  • 1
  • 2
  • 3
  • 4
  • 5

}

3、此处Properties这个类有意思

protected void init(String kafkaServers, boolean sync) throws Exception {
Properties props = new Properties();
props.put(“bootstrap.servers”, kafkaServers);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
this.producer = new KafkaProducer(props);
}

class Properties extends Hashtable<Object,Object> {}
继承的是Hashtable

深夜看代码 真得劲 越看越兴奋

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/839415
推荐阅读
相关标签
  

闽ICP备14008679号