当前位置:   article > 正文

Flink消费kafka中的数据(scala版)_编写 scala 代码,使用 flink 消费 kafka 中 topic 为 orde r 的数

编写 scala 代码,使用 flink 消费 kafka 中 topic 为 orde r 的数 据并进行相应的

kafka依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>

核心代码:

  1. import org.apache.flink.api.common.serialization.SimpleStringSchema
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
  4. import java.util.Properties
  5. /**
  6. * @author shkstart
  7. * @create 2021-07-23 8:06
  8. */
  9. object Test {
  10. def main(args: Array[String]): Unit = {
  11. //1.创建运行环境
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. //2.读取kafka中的数据
  14. //2.1创建配置对象,这里的对象为java的对象
  15. val properties = new Properties()
  16. properties.setProperty("bootstrap.servers","hadoop101:9092") //hadoop101为自己的kafka地址
  17. properties.setProperty("group.id","consumer-group")
  18. val kafavalue = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
  19. //打印
  20. kafavalue.print()
  21. //执行
  22. env.execute()
  23. }
  24. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/514536
推荐阅读
相关标签
  

闽ICP备14008679号