当前位置:   article > 正文

大数据——Flink 入门程序(wordcount)_flink如何运行wordcount.jar

flink如何运行wordcount.jar

目录

 一、编程模型

二、编程步骤

三、DataStream 实时 wordcount

​四、DataSet 离线wordcount


 一、编程模型

Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink

  • Source主要负责数据的读取
  • Transformation主要负责对数据的转换操作
  • Sink负责最终计算好的结果数据输出。

二、编程步骤

  • 创建执行环境 Environment
  • 加载数据源 Source
  • 转换数据 Transformation
  • 数据输出
  • 执行程序

三、DataStream 实时 wordcount

3.1 本地执行

  1. package cn.kgc.datasteam
  2. //创建完执行环境后,将StreamExcutionEnvironment替换成_
  3. import org.apache.flink.streaming.api.scala._
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. //创建执行环境
  7. val env = StreamExecutionEnvironment.getExecutionEnvironment
  8. //设置并行度为2,默认为本地core核数
  9. env.setParallelism(2)
  10. //读取数据 source
  11. //通过侦听hadoop101的1234端口采集数据
  12. val inputStream = env.socketTextStream("hadoop101", 1234)
  13. //转换数据 transformation
  14. //将接收到的内容按空格拆分后展平
  15. val result = inputStream.flatMap(_.split(" "))
  16. .map((_, 1))
  17. //根据单词分组聚合
  18. .keyBy(_._1)
  19. .sum(1)
  20. //结果输出
  21. result.print()
  22. //执行程序
  23. env.execute()
  24. }
  25. }

3.2上传至虚拟机执行代码

  1. package cn.kgc.datasteam
  2. import org.apache.flink.api.java.utils.ParameterTool
  3. import org.apache.flink.streaming.api.scala._
  4. object WordCountRunOnVM {
  5. def main(args: Array[String]): Unit = {
  6. //创建执行环境
  7. val env = StreamExecutionEnvironment.getExecutionEnvironment
  8. //通过参数的方式传入host主机名和port端口号
  9. val tool = ParameterTool.fromArgs(args)
  10. val host = tool.get("host")
  11. val port = tool.getInt("port")
  12. //读取数据源
  13. val inputStream = env.socketTextStream(host, port)
  14. //转换数据
  15. val result = inputStream.flatMap(_.split(" "))
  16. .map((_, 1))
  17. .keyBy(_._1)
  18. .sum(1)
  19. //结果输出
  20. result.print()
  21. //执行程序
  22. env.execute()
  23. }
  24. }

操作步骤

1. 生成 jar 包

 2. 在虚拟机中启动start-cluser.sh服务

start-cluster.sh

3. 开启1234端口侦听

nc -lk 1234

4.  登录master:8081页面,上传 jar 包

 

四、DataSet 离线wordcount

  1. package cn.kgc.dataset
  2. import org.apache.flink.api.scala._
  3. object WordCount {
  4. def main(args: Array[String]): Unit = {
  5. //DataSet批处理创建环境的方式 : ExecutionEnvironment.gerExecutionEnvironment
  6. val env = ExecutionEnvironment.getExecutionEnvironment
  7. //获取数据源
  8. val input = env.readTextFile("C:/Users/Administrator/Desktop/flink_project/src/main/resources/wc.txt",
  9. "UTF-8")
  10. //转换数据
  11. val result = input.flatMap(_.split(" "))
  12. .map((_, 1))
  13. //根据单词分组聚合
  14. .groupBy(0)
  15. .sum(1)
  16. //结果输出
  17. result.print()
  18. }
  19. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/944087
推荐阅读
相关标签
  

闽ICP备14008679号