赞
踩
滑动窗口补充理解
每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。
时间片 1s,窗口长度 3s,滑动间隔 2s,总之:每隔2s出现前3s的RDD
网官图中所示,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。
<!--spark streaming依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compile.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds /** * 1.构建StreamContext * 2.打开输入数据源,得到DStream * 3.进行常规算子操作 * 4.开启Streaming引擎 * 5.等待结束 */ object SparkStreaming01 { def main(args: Array[String]): Unit = { /** * 1.构建StreamContext */ val conf=new SparkConf; conf.setAppName("wordCount-streaming"); conf.setMaster("local[2]"); var ssc=new StreamingContext(conf,Seconds(2)); /** * 2.打开输入数据源,得到DStream */ val linesDStream=ssc.socketTextStream("localhost",9999) /** * 3.进行常规算子操作 */ var wcDStream=linesDStream.flatMap(line => line.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_); wcDStream.print(); // wcDStream.foreachRDD(rdd => { // rdd.foreach(println()) // }); /** * 4.开启Streaming引擎 */ ssc.start(); /** * 5.等待结束 */ ssc.awaitTermination(); } }
$ ./nc64.exe -lp 9999
在第二步输入源阶段,将输入源改为文件目录读取。
//监控目录 如果文件系统发生变化 就读取进来
val linesDStream = ssc.textFileStream("F:\\sparkStreming_test")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。