赞
踩
具体代码实现:
pom文件引入:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.flink01</groupId> <artifactId>Flink_flink01</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--java依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.6.1</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.6.1</version> </dependency> </dependencies> </project>
package xuwei.tech import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * 需求:滑动窗口计算 *需要实现每隔1秒对最近2秒的数据进行汇总计算 */ object SocketWindowWordCountScala { def main(args: Array[String]): Unit = { //获取socket端口号 val port:Int= try { ParameterTool.fromArgs(args).getInt("port") } catch { case e:Exception =>{ System.err.println("No port set.use default port 9999") } 9999 } //获取flink的运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //链接socket获取输入数字 val text= env.socketTextStream("flink102",port,'\n') //注意:必须要添加这一行隐式转换,否者下面的flatMap方法执行会报错 import org.apache.flink.api.scala._ //解析数据(把数据打平),分组、窗口计算,并且聚合求sum val windowCounts= text.flatMap(line =>line.split("\\s"))//打平,把每一行单词都切开 .map(w => WordWindowCount(w,1)) //把单词转成word,1种形式 .keyBy("word") .timeWindow(Time.seconds(2),Time.seconds(1)) //指定窗口大小,指定间隔时间 .sum("count") //sum或reduce都可以 // .reduce((a,b)=>WordWindowCount(a.word,a.count+b.count)) windowCounts.print().setParallelism(1) //打印到控制台 //执行任务 env.execute("Socket window count") } case class WordWindowCount(word:String,count: Long) }
控制台打印的效果跟上篇:第 1 节 滑动窗口单词计数(Java实现)
的运行效果一样
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。