赞
踩
1)每个Flink程序都包含以下的若干流程: (1)获取一个执行环境(Execution Environment) (2)加载、创建初始数据(source) (3)转换这些数据:(Transformation) (4)放置计算结果的位置(Sink) (5)触发程序执行(execute) 2)开发依赖 在后面 学习中 我们都使用scala编写。如下是开发用到的依赖,如果要打成jar包运行,就要上传到 flink的webUI上面,进行远行。 |
<dependencies> <!-- 这是1.11.6中间版本的依赖 ,目前新版本已经很高了 但是本笔记很多代码原本是用的是1.7.2低版本,出现了很多的新功能,建议不要在用旧版的了,先试下1.11.6 ,如果报错那就用就版本吧 --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.11.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.11.6</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.11.6</version> </dependency>
</dependencies> |
import org.apache.flink.api.scala._ /** */ // 批处理代码 object WordCount { def main(args: Array[String]): Unit = { // 创建一个批处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val inputPath = "D:\\WorkCache_IDEA\\FlinkTutorial\\src\\main\\resources\\hello.txt" val inputDataSet = env.readTextFile(inputPath) // 分词之后做count val wordCountDataSet = inputDataSet.flatMap(_.split(" ")) .map( (_, 1) ) .groupBy(0) //批处理有groupBy .sum(1) // 打印输出 wordCountDataSet.print() } } |
————————————————————————
————————————————————————
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") // 创建一个流处理的执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket数据流 val textDataStream = env.socketTextStream(host, port) // 当然 这样是可以直接输入的 // 逐一读取数据,分词之后进行wordcount val wordCountDataStream = textDataStream.flatMap(_.split("\\s")) // 按空格分隔 .filter(_.nonEmpty) .map( (_, 1) ) .keyBy(0) //流处理有keyBy .sum(1) // 打印输出 wordCountDataStream.print() // 执行任务 env.execute("stream word count job") } } |
运行结果: |
————————————————————————
————————————————————————
————————————————————————
————————————————————————
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。