当前位置:   article > 正文

Java编程操作Kafka_maven kafka包

maven kafka包

同步生产消息到Kafka

接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。

导入Maven Kafka POM依赖

  1. <!-- kafka客户端工具 -->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

创建KafkaProducerTest类。

可以参考以下方式来编写第一个Kafka示例程序

1.创建用于连接Kafka的Properties配置

2.创建一个生产者对象KafkaProducer

3.调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值

4.再调用一个Future.get()方法等待响应

5.关闭生产者

参考代码:

  1. import java.util.Properties;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.Future;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import org.apache.kafka.clients.producer.RecordMetadata;
  7. public class KafkaProducerTest {
  8. public static void main(String[] args) {
  9. // 1. 创建用于连接Kafka的Properties配置
  10. Properties props=new Properties();
  11. props.put("bootstrap.servers","192.168.2.3:9092");
  12. props.put("acks","all");
  13. props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  14. props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  15. // 2. 创建一个生产者对象KafkaProducer
  16. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  17. // 3. 调用send发送1-100消息到指定Topic test
  18. for(int i = 0; i < 100; ++i) {
  19. try {
  20. // 获取返回值Future,该对象封装了返回值
  21. Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
  22. // 调用一个Future.get()方法等待响应
  23. future.get();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } catch (ExecutionException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. // 5. 关闭生产者
  31. producer.close();
  32. }
  33. }

从Kafka的topic中消费消息

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

创建KafkaConsumerTest类

1.创建Kafka消费者配置

  1. Properties props = new Properties();
  2. props.setProperty("bootstrap.servers", "192.168.2.3:9092");
  3. props.setProperty("group.id", "test");
  4. props.setProperty("enable.auto.commit", "true");
  5. props.setProperty("auto.commit.interval.ms", "1000");
  6. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  7. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2.创建Kafka消费者

3.订阅要消费的主题

4.使用一个while循环,不断从Kafka的topic中拉取消息

5.将将记录(record)的offset、key、value都打印出来

参考代码

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.Future;
  7. public class KafkaConsumerTest {
  8. public static void main(String[] args) {
  9. // 1. 创建用于连接Kafka的Properties配置
  10. Properties props = new Properties();
  11. props.put("bootstrap.servers", "192.168.2.3:9092");
  12. props.put("group.id", "test-group");
  13. props.put("key.deserializer", StringDeserializer.class.getName()); // 设置键的反序列化器
  14. props.put("value.deserializer", StringDeserializer.class.getName()); // 设置值的反序列化器
  15. // 2. 创建一个消费者对象 KafkaConsumer
  16. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  17. // 3. 订阅主题
  18. consumer.subscribe(Collections.singletonList("test"));
  19. // 4. 拉取消息
  20. try {
  21. while (true) {
  22. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  23. records.forEach(record -> {
  24. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  25. });
  26. }
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. // 5. 关闭消费者
  31. consumer.close();
  32. }
  33. }
  34. }

异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:

  1. 在发送消息出现异常时,能够及时打印出异常信息
  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset
  1. import java.util.Properties;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.Future;
  4. import org.apache.kafka.clients.producer.Callback;
  5. import org.apache.kafka.clients.producer.KafkaProducer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.clients.producer.RecordMetadata;
  8. public class KafkaProducerTest {
  9. public static void main(String[] args) {
  10. // 1. 创建用于连接Kafka的Properties配置
  11. Properties props = new Properties();
  12. props.put("bootstrap.servers", "192.168.2.3:9092");
  13. props.put("acks", "all");
  14. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  15. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  16. // 2. 创建一个生产者对象KafkaProducer
  17. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  18. // 3. 调用send发送1-100消息到指定Topic test
  19. for (int i = 0; i < 100; ++i) {
  20. // 一、同步方式
  21. // 获取返回值Future,该对象封装了返回值
  22. // Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
  23. // 调用一个Future.get()方法等待响应
  24. // future.get();
  25. // 二、带回调函数异步方式
  26. producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
  27. @Override
  28. public void onCompletion(RecordMetadata metadata, Exception exception) {
  29. if (exception != null) {
  30. System.out.println("发送消息出现异常");
  31. } else {
  32. String topic = metadata.topic();
  33. int partition = metadata.partition();
  34. long offset = metadata.offset();
  35. System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
  36. }
  37. }
  38. });
  39. }
  40. // 5. 关闭生产者
  41. producer.close();
  42. }
  43. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/985148
推荐阅读
相关标签
  

闽ICP备14008679号