当前位置:   article > 正文

SparkStream_spark streaming中,微批处理的时间间隔由谁来定义?

spark streaming中,微批处理的时间间隔由谁来定义?

1.什么是SparkStreaming?

  • 微批处理的流式(数据)实时计算框架
  • 输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,即可用于处理实时数据流。
  • 支持从多种数据源获取数据,通过高级函数进行复杂算法处理,最后将处理结果存储到HDFS文件系统,数据库。

2.重要概念

  1. StreamingContext
    类比sparkContext,流计算框架的中枢类,负责环境信息,分发调度
  2. 数据源 Source
  3. 离散流
    DStream,微批处理当中的数据抽象单位。
  4. 输入离散流
    连接到一个外部Source来读取数据的统称
  5. 批数据
    将流式数据转化成时间片为单位数据进行批数据处理
  6. 时间片或批处理时间间隔
    人为对流数据进行定量的标准,以时间片作为拆分流数据的依据。
    一个时间片的数据对应一个RDD实例
  7. 窗口长度
    一个窗口覆盖的流数据的时间长度,必须是批处理时间间隔的倍数。
    窗口分类: 滑动窗口、滚动窗口
  8. 滑动窗口时间间隔
    前窗口到后窗口所经过的时间长度间隔,必须是批处理时间间隔的倍数。

滑动窗口补充理解
每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。
时间片 1s,窗口长度 3s,滑动间隔 2s,总之:每隔2s出现前3s的RDD
网官图中所示,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

3.Scala快速构建SparkStreaming应用

  • Maven pom.xml中添加spark-streaming依赖
<!--spark  streaming依赖  -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.compile.version}</artifactId>
        <version>2.3.2</version>
        <scope>provided</scope>
    </dependency>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 以StreamingContext为起点,而向Dstream编程
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

/**
 * 1.构建StreamContext 
 * 2.打开输入数据源,得到DStream
 * 3.进行常规算子操作
 * 4.开启Streaming引擎
 * 5.等待结束
 */
object SparkStreaming01 {
  def main(args: Array[String]): Unit = {
    /**
 * 1.构建StreamContext 
 */
    val conf=new SparkConf;
    conf.setAppName("wordCount-streaming");
    conf.setMaster("local[2]");
    var ssc=new StreamingContext(conf,Seconds(2));
    
    /**
 * 2.打开输入数据源,得到DStream
 */
    val linesDStream=ssc.socketTextStream("localhost",9999)
    
    /**
 * 3.进行常规算子操作
 */
    var wcDStream=linesDStream.flatMap(line => line.split("\\s+")).map(word=>(word,1)).reduceByKey(_+_);
    wcDStream.print();
    
//    wcDStream.foreachRDD(rdd => {
//      rdd.foreach(println())
//    });
    
    /**
 * 4.开启Streaming引擎
 */
    ssc.start();
    
    /**
 * 5.等待结束
 */
    ssc.awaitTermination();     
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • netcat 打开对外TCP数据服务端口
  • 在Git Bash上开启
$ ./nc64.exe -lp 9999
  • 1

4.对本地目录为输入数据源

在第二步输入源阶段,将输入源改为文件目录读取。

//监控目录 如果文件系统发生变化 就读取进来
    val linesDStream = ssc.textFileStream("F:\\sparkStreming_test")
  • 1
  • 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/975656
推荐阅读
相关标签
  

闽ICP备14008679号