赞
踩
接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。
导入Maven Kafka POM依赖
- <!-- kafka客户端工具 -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.4.1</version>
- </dependency>
创建KafkaProducerTest类。
可以参考以下方式来编写第一个Kafka示例程序
1.创建用于连接Kafka的Properties配置
2.创建一个生产者对象KafkaProducer
3.调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
4.再调用一个Future.get()方法等待响应
5.关闭生产者
参考代码:
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- public class KafkaProducerTest {
- public static void main(String[] args) {
- // 1. 创建用于连接Kafka的Properties配置
- Properties props=new Properties();
- props.put("bootstrap.servers","192.168.2.3:9092");
- props.put("acks","all");
- props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
- // 2. 创建一个生产者对象KafkaProducer
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
- // 3. 调用send发送1-100消息到指定Topic test
- for(int i = 0; i < 100; ++i) {
- try {
- // 获取返回值Future,该对象封装了返回值
- Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
- // 调用一个Future.get()方法等待响应
- future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
-
- // 5. 关闭生产者
- producer.close();
- }
- }
从 test topic中,将消息都消费,并将记录的offset、key、value打印出来
创建KafkaConsumerTest类
1.创建Kafka消费者配置
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.2.3:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2.创建Kafka消费者
3.订阅要消费的主题
4.使用一个while循环,不断从Kafka的topic中拉取消息
5.将将记录(record)的offset、key、value都打印出来
参考代码
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- public class KafkaConsumerTest {
- public static void main(String[] args) {
- // 1. 创建用于连接Kafka的Properties配置
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.2.3:9092");
- props.put("group.id", "test-group");
- props.put("key.deserializer", StringDeserializer.class.getName()); // 设置键的反序列化器
- props.put("value.deserializer", StringDeserializer.class.getName()); // 设置值的反序列化器
-
- // 2. 创建一个消费者对象 KafkaConsumer
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
- // 3. 订阅主题
- consumer.subscribe(Collections.singletonList("test"));
-
- // 4. 拉取消息
- try {
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- records.forEach(record -> {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- });
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 5. 关闭消费者
- consumer.close();
- }
- }
- }
如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。
需求:
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
-
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- public class KafkaProducerTest {
- public static void main(String[] args) {
- // 1. 创建用于连接Kafka的Properties配置
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.2.3:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2. 创建一个生产者对象KafkaProducer
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
- // 3. 调用send发送1-100消息到指定Topic test
- for (int i = 0; i < 100; ++i) {
- // 一、同步方式
- // 获取返回值Future,该对象封装了返回值
- // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
- // 调用一个Future.get()方法等待响应
- // future.get();
-
- // 二、带回调函数异步方式
- producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- System.out.println("发送消息出现异常");
- } else {
- String topic = metadata.topic();
- int partition = metadata.partition();
- long offset = metadata.offset();
-
- System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
- }
- }
- });
- }
-
-
- // 5. 关闭生产者
- producer.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。