赞
踩
1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
代码示例:
package com.apache.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducer { public static void main(String[] args) { //配置 Properties properties = new Properties(); //连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092"); //指定对应key value序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建kafka生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //发送数据 for (int i = 0; i <= 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", "org" + i)); } //关闭资源 kafkaProducer.close(); } }
测试:在hadoop103 下输入消费者命令
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
在idea执行代码,查看
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
基于异步代码,示例如下:
//添加回调函数
kafkaProducer.send(new ProducerRecord<>("first", "org" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
//查看exception是否为空,如果为空,则打印成功并返回信息,如果失败则不打印,依旧返回信息
if (exception == null) {
System.out.println("主题:" + recordMetadata.topic() + "\t 分区:" + recordMetadata.partition());
}
}
});
整体代码如下:
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
package com.apache.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerCallback { public static void main(String[] args) { //配置 Properties properties = new Properties(); //连接集群 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092"); //指定对应key value序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //创建kafka生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //发送数据 for (int i = 0; i <= 5; i++) { //添加回调函数 kafkaProducer.send(new ProducerRecord<>("first", "org" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { //查看exception是否为空,如果为空,则打印成功并返回信息,如果失败则不打印,依旧返回信息 if (exception == null) { System.out.println("主题:" + recordMetadata.topic() + "\t 分区:" + recordMetadata.partition()); } } }); } //关闭资源 kafkaProducer.close(); } }
测试后,idea 控制台打印如下:
分区0 是因为创建Topic时,只创建了1个分区,所以 才会一直是分区 0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。