当前位置:   article > 正文

Flink1.9.1,scala2.12连接kafka2.11_2.40实例_can't resolve address: flink:9092

can't resolve address: flink:9092

1.添加相关依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.12</artifactId>
  4. <version>1.9.1</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-scala_2.12</artifactId>
  10. <version>1.9.1</version>
  11. </dependency>
  12. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-scala_2.12</artifactId>
  16. <version>1.9.1</version>
  17. </dependency>
  18. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-core</artifactId>
  22. <version>1.9.1</version>
  23. </dependency>
  24. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  25. <dependency>
  26. <groupId>org.apache.kafka</groupId>
  27. <artifactId>kafka_2.13</artifactId>
  28. <version>2.4.0</version>
  29. </dependency>

2.创建scala类,并开发代码

  1. package com.vincer
  2. import java.util.Properties
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  6. // flatMap和Map需要引用的隐式转换
  7. import org.apache.flink.api.scala._
  8. /**
  9. * @Package com.vincer
  10. * @ClassName conkafka
  11. * @Author Vincer
  12. * @Date 2020/01/13 22:31
  13. * @ProjectName kafka-flink
  14. */
  15. object conkafka {
  16. def main(args: Array[String]): Unit = {
  17. val kafkaProps = new Properties()
  18. //kafka的一些属性
  19. kafkaProps.setProperty("bootstrap.servers", "hadoop100:9092")
  20. //所在的消费组
  21. kafkaProps.setProperty("group.id", "group_test")
  22. //获取当前的执行环境
  23. val evn = StreamExecutionEnvironment.getExecutionEnvironment
  24. //kafka的consumer,test1是要消费的topic
  25. val kafkaSource = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema, kafkaProps)
  26. //设置从最新的offset开始消费
  27. //kafkaSource.setStartFromLatest()
  28. //自动提交offset
  29. kafkaSource.setCommitOffsetsOnCheckpoints(true)
  30. //flink的checkpoint的时间间隔
  31. evn.enableCheckpointing(5000)
  32. //添加consumer
  33. val stream = evn.addSource(kafkaSource)
  34. stream.setParallelism(3)
  35. val text = stream
  36. text.print()
  37. //启动执行
  38. evn.execute("kafkawd")
  39. }
  40. }

3.启动zookeeper,kafka

(过程免)

4.启动kafka的Client生产数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

5.运行代码

6.在kafka上生产数据,打印到IDEA的控制台

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/606392
推荐阅读
相关标签
  

闽ICP备14008679号