赞
踩
依赖包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
代码示例
注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
package com.xz.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author: xz
* @since: 2023/4/2 14:10
* @description: 生产者带回调函数的异步发送消息
*/
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
//1、创建 kafka 生产者的配置对象
Properties properties = new Properties();
//2、给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
//3、指定对应的key和value的序列化类型 key.serializer value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//4、创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//5、调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
//6、关闭资源
kafkaProducer.close();
}
}
在kafka集群某一台服务器上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
观察开启 Kafka 消费者的服务器中是否接收到消息。如下图所示:
在 IDEA 控制台观察回调信息,控制台如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。