赞
踩
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer{
public static void main(String[] args) throws InterruptedException{
// 1.创建Kafka生产者的配置对象
Properties properties = new Properties();
// 2.给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
// key,value序列化:key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 3.创建kafka生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
// 4.调用send方法,发送消息
for (int i = 0; i < 5; i++){
kafkaProducer.send(new ProducerRecord<>("first","testMessage"+i));
}
// 5.关闭资源
kafkaProducer.close();
}
}
回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1.创建kafka生产者的配置对象
Properties properties = new Properties();
// 2.给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
// key,value序列化:key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 3.创建kafka生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4.调用send方法,发送消息
for (int i=0; i<5; i++){
// 添加回调
kafkaProducer.send(new ProducerRecord<>("first","testMessage"+i),new Callback(){
// 该方法在Producer收到ack时调用,为异步调用
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
// 没有异常,输出信息到控制台
System.out.println("主题: "+recordMetadata.topic() + "->" + "分区: "+recordMetadata.partition());
} else {
// 出现异常打印
e.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
// 5.关闭资源
kafkaProducer.close();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。