当前位置:   article > 正文

flink本地运行及访问webui_flink web ui 本地

flink web ui 本地

1 webui的依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-runtime-web_2.11</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>${scope.type}</scope>
  6. </dependency>

-

2 本地运行代码

  1. import org.apache.flink.api.common.eventtime.WatermarkStrategy
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema
  3. import org.apache.flink.configuration.{Configuration, RestOptions}
  4. import org.apache.flink.connector.kafka.source.KafkaSource
  5. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
  6. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
  7. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
  8. object DataStreamReadKafka {
  9. def main(args: Array[String]): Unit = {
  10. // 创建带webui的本地执行环境
  11. val conf = new Configuration()
  12. conf.setString(RestOptions.BIND_PORT, "8081") // 指定访问端口
  13. val env = StreamExecutionEnvironment
  14. .createLocalEnvironmentWithWebUI(conf)
  15. env.setParallelism(1)
  16. val tab_env = StreamTableEnvironment.create(env)
  17. // 读取kafka数据
  18. val topics = "test,test_2".split(",").toList
  19. val source = KafkaSource
  20. .builder[String]
  21. .setBootstrapServers("localhost:9092")
  22. .setTopics(topics: _*)
  23. .setGroupId("shy_test")
  24. .setStartingOffsets(OffsetsInitializer.earliest)
  25. .setValueOnlyDeserializer(new SimpleStringSchema())
  26. .build
  27. val input_stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_source")
  28. input_stream.print()
  29. // 启动执行环境
  30. env.execute()
  31. }
  32. }

3 访问webui

http://localhost:8081/

注意: 每个任务指定一个端口访问webUI

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/772675
推荐阅读
相关标签
  

闽ICP备14008679号