当前位置:   article > 正文

Kafka-java代码向kafka中输入和消费数据

java消费kafka数据代码

Kafka-java

1. 在写代码前需要导入依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>

2. 使用java代码从kafka中拿数据

  1. package com.wt.flink.scurce
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.connector.kafka.source.KafkaSource
  5. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
  6. import org.apache.flink.streaming.api.scala._
  7. object Demo5KafkaSource {
  8. def main(args: Array[String]): Unit = {
  9. //创建flink的环境
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. /**
  12. * 构建kafka source
  13. */
  14. val source: KafkaSource[String] = KafkaSource
  15. .builder[String]
  16. .setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表
  17. .setTopics("test_topic2") //指定topic
  18. .setGroupId("my_group") //指定消费组,一条数据指能在一个组内只能被消费一次
  19. .setStartingOffsets(OffsetsInitializer.earliest()) //读取数据的位置,earliest:读取所有的数据,latest:读取最新的数据
  20. .setValueOnlyDeserializer(new SimpleStringSchema()) //反序列的类
  21. .build()
  22. //使用kafka source
  23. val kafkaDS: DataStream[String] = env.fromSource(source,WatermarkStrategy.noWatermarks(),"kafka Source")
  24. kafkaDS.print()
  25. env.execute()
  26. }
  27. }

3. 用java代码向kafka中打入数据

  1. package com.wt.flink.kafka
  2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  3. import java.util.Properties
  4. object Demo1KafkaProducer {
  5. def main(args: Array[String]): Unit = {
  6. /**
  7. * 1. 创建生产者
  8. *
  9. */
  10. val properties = new Properties()
  11. //指定kafka broker的地址
  12. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
  13. //设置key 和 value的序列化的类
  14. properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
  15. properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
  16. val producer = new KafkaProducer[String, String](properties)
  17. val record = new ProducerRecord[String, String]("test_topic2", "woaini,zhongguo")
  18. //发送数据到kafka中
  19. producer.send(record)
  20. producer.flush()
  21. //关闭连接
  22. producer.close()
  23. }
  24. }

4. 向kafka中批量打入学生数据

  1. package com.wt.flink.kafka
  2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  3. import java.util.Properties
  4. import scala.io.Source
  5. object Demo2StudentToKafka {
  6. def main(args: Array[String]): Unit = {
  7. /**
  8. * 创建生产者
  9. *
  10. */
  11. val properties = new Properties()
  12. //指定kafka broker 的地址
  13. //指定kafka broker地址
  14. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
  15. //设置key 和value的序列化类
  16. properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  17. properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  18. val producer = new KafkaProducer[String, String](properties)
  19. /**
  20. * 将学生表数据批量写到kafka中
  21. *
  22. */
  23. val studentList: List[String] = Source.fromFile("data/students.txt").getLines().toList
  24. //发送数据到kafka中
  25. for (student <- studentList) {
  26. val record = new ProducerRecord[String, String]("student", student)
  27. producer.send(record)
  28. producer.flush()
  29. }
  30. producer.close()
  31. }
  32. }

5. 在kafka中批量拿数据

  1. package com.wt.flink.kafka
  2. import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
  3. import java.time.Duration
  4. import java.util.Properties
  5. import java.{lang, util}
  6. object Demo3KafkaConsumer {
  7. def main(args: Array[String]): Unit = {
  8. /**
  9. * 1. 创建消费者
  10. *
  11. */
  12. val properties = new Properties()
  13. properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092")
  14. //key 和value 反序列化的类
  15. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  16. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  17. /**
  18. * earliest
  19. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  20. * latest 默认
  21. * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
  22. * none
  23. * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  24. *
  25. */
  26. properties.setProperty("auto.offset.reset","earliest")
  27. //消费者组
  28. properties.setProperty("group.id","suibian_mingzi")
  29. val consumer = new KafkaConsumer[String, String](properties)
  30. /**
  31. * 2. 订阅一个 topic, 可以一次定义多个topic
  32. *
  33. */
  34. val topics = new util.ArrayList[String]()
  35. topics.add("student")
  36. consumer.subscribe(topics)
  37. while (true) {
  38. println("正在消费")
  39. /**
  40. * 消费数据,这需要设置一个超时时间
  41. *
  42. */
  43. val consumerRecords: ConsumerRecords[String, String] = consumer
  44. .poll(Duration.ofSeconds(2))
  45. //解析数据
  46. val records: lang.Iterable[ConsumerRecord[String, String]] = consumerRecords.records("student")
  47. val iterRecord: util.Iterator[ConsumerRecord[String, String]] = records.iterator()
  48. while (iterRecord.hasNext) {
  49. //获取一行数据
  50. val record: ConsumerRecord[String, String] = iterRecord.next()
  51. val topic: String = record.topic() //topic
  52. val offset: Long = record.offset() //数据偏移量
  53. val key: String = record.key() //数据的key,默认情况下没有指定的的话为null
  54. val value: String = record.value() //保存数据
  55. val ts: Long = record.timestamp() //时间戳,默认存入的时间
  56. println(s"$topic\t$offset\t$key\t$value\t$ts")
  57. }
  58. }
  59. //关闭连接
  60. consumer.close()
  61. }
  62. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/985225
推荐阅读
相关标签
  

闽ICP备14008679号