当前位置:   article > 正文

Kafka(八) 使用JAVA代码编写生产者向kafka发送数据,编写消费者消费数据_java中kafka生产者发送一条数据库记录

java中kafka生产者发送一条数据库记录

一。使用JAVA向kafaka发送数据

 先在服务器端启动消费者监听,

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

再运行下边生产者代码我们在上边的服务器端会收到JAVA客户端发送的数据

添加pom依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>

生产者代码

  1. package kafaka;
  2. import org.apache.kafka.clients.producer.Callback;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.clients.producer.RecordMetadata;
  6. import java.util.Properties;
  7. public class ProducerHelloworld {
  8. public static void main(String[] args) {
  9. // 1. 创建用于连接Kafka的Properties配置
  10. String topic = "test";
  11. Properties props = new Properties();
  12. props.put("bootstrap.servers", "192.168.20.101:9092");
  13. props.put("acks", "all");
  14. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  15. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  16. // 2. 创建一个生产者对象KafkaProducer
  17. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  18. // 二、带回调函数异步方式
  19. producer.send(new ProducerRecord<String, String>("test", null, "wjm"), new Callback() {
  20. public void onCompletion(RecordMetadata metadata, Exception exception) {
  21. if(exception != null) {
  22. System.out.println("发送消息出现异常");
  23. }
  24. else {
  25. String topic = metadata.topic();
  26. int partition = metadata.partition();
  27. long offset = metadata.offset();
  28. System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
  29. }
  30. }
  31. });
  32. // 5. 关闭生产者
  33. producer.close();
  34. }
  35. }

二。使用JAVA编写消费者

建立消费者代码

  1. package kafaka;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. import org.apache.kafka.clients.producer.ProducerConfig;
  8. import org.apache.kafka.clients.producer.ProducerRecord;
  9. import org.apache.kafka.common.serialization.StringDeserializer;
  10. import org.apache.kafka.common.serialization.StringSerializer;
  11. import java.util.Collections;
  12. import java.util.Properties;
  13. import java.util.Random;
  14. public class KafaTest {
  15. public static String topic = "test";
  16. public static void main(String[] args) {
  17. new Thread(()-> new Producer().execute()).start();
  18. new Thread(()-> new Consumer().execute()).start();
  19. }
  20. public static class Consumer {
  21. private void execute() {
  22. Properties p = new Properties();
  23. p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.101:9092");
  24. p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  25. p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. p.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
  27. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
  28. // 订阅消息
  29. kafkaConsumer.subscribe(Collections.singletonList(topic));
  30. while (true) {
  31. ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  32. for (ConsumerRecord<String, String> record : records) {
  33. System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
  34. record.topic(), record.offset(), record.value()));
  35. }
  36. }
  37. }
  38. }
  39. public static class Producer {
  40. private void execute() {
  41. Properties p = new Properties();
  42. //kafka地址,多个地址用逗号分割
  43. p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.20.101:9092");
  44. p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  45. p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  46. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
  47. try {
  48. while (true) {
  49. String msg = "Hello," + new Random().nextInt(100);
  50. ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
  51. kafkaProducer.send(record);
  52. System.out.println("消息发送成功:" + msg);
  53. Thread.sleep(500);
  54. }
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. } finally {
  58. kafkaProducer.close();
  59. }
  60. }
  61. }
  62. }

 首先我们运行消费者代码,接着在服务器上

使用生产者命令,生产数据kafka-console-producer.sh --broker-list localhost:9092 --topic test

发送数据

最后在代码的console端可以看到在服务器上发送的数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/958864
推荐阅读
相关标签
  

闽ICP备14008679号