赞
踩
目录
大数据的计算模式主要分为批量计算(batch computing)、流式计算(stream computing)、交互计算(interactive computing)、图计算(graph computing)等。其中,流式计算和批量计算是两种主要的大数据计算模式,分别适用于不同的大数据应用场景
首先我们简单看一下计算任务的大致流程:
图中显示了了一个计算的基本流程,receiver处负责从数据源接收数据,并发送给下游的task,数据由task处理后有sink端输出。
从上图看的处,批量和流式处理数据粒度不一样,批量每次处理一定大小的数据块(输入一般采用文件系统),一个task处理完一个数据块之后,才将处理好的中间数据发送给下游。流式计算则是以record为单位,task在处理完一条记录之后,立马发送给下游。
假如我们是对一些固定大小的数据做统计,那么采用批量和流式效果基本相同,但是流式有一个好处就是可以实时得到计算中的结果,这对某些应用很有帮助,比如每1分钟统计一下请求server的request次数。
那么问题来了,既然流式系统也可以做批量系统的事情,而且还提供了更多的功能,那为什么还需要批量系统呢?因为早期的流式系统并不成熟,存在如下问题:
1.流式系统的吞吐不如批量系统
2.流式系统无法提供精准的计算
后面的介绍Storm、Spark streaming、Flink主要根据这两点来进行介绍。
①、数据处理单位:
批量计算按数据块来处理数据,每一个task接收一定大小的数据块,比如MR,map任务在处理完一个完整的数据块后(比如128M),然后将中间数据发送给reduce任务。
流式计算的上游算子处理完一条数据后,会立马发送给下游算子,所以一条数据从进入流式系统到输出结果的时间间隔较短(当然有的流式系统为了保证吞吐,也会对数据做buffer)。
这样的结果就是:批量计算往往得等任务全部跑完之后才能得到结果,而流式计算则可以实时获取最新的计算结果。
②、数据源:
批量计算通常处理的是有限数据(bound data),数据源一般采用文件系统,
流式计算通常处理无限数据(unbound data),一般采用消息队列作为数据源。
③、任务类型:
批量计算中的每个任务都是短任务,任务在处理完其负责的数据后关闭,
流式计算往往是长任务,每个work一直运行,持续接受数据源传过来的数据。
习惯上我们认为离线和批量等价;实时和流式等价,但其实这种观点并不完全正确。
假设一种情况:当我们拥有一个非常强大的硬件系统,可以毫秒级的处理Gb级别的数据,那么批量计算也可以毫秒级得到统计结果(当然这种情况非常极端,目前不可能),那我们还能说它是离线计算吗?
所以说离线和实时应该指的是:数据处理的延迟;批量和流式指的是:数据处理的方式。两者并没有必然的关系。事实上Spark streaming就是采用小批量(batch)的方式来实现实时计算。
Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力,以吞吐量高和容错能力强著称。
实时流处理指的是随着数据的实时到达,进行实时计算。目前做实时流计算的框架:
Storm、SparkStreaming、Flink
上图:为SparkStreaming实时流计算的过程。我们总结如下知识点:
①、SparkStreaming接收连续不断的数据流,然后将数据流离散化成一批一批的数据,底层是以批为单位进行处理。
②、SparkStreaming的批大小由程序员自己定义,单位是时间,比如秒或毫秒,所以每一批中的数据量是不同的。
③、由上述机制,决定了SparkStreaming在实时流处理过程中,吞吐量比较高(以批为单位进行处理),但是不能做到低延迟。
④、SparkStreaming中的每一批数据,称为DStream(离散化的数据流Discretized-Stream)
⑤、SparkStreaming就是对每个DStream进行计算处理。DStream底层就是一个RDD。所以表面上是对DStream操作,但底层仍然会转为RDD的操作。比如:map,flatMap,filter,reduceByKey都可用于DStream。
⑥、因为底层会转为RDD的操作,所以SparkStreaming在做流处理时,有天然的容错性保证。这是由RDD的容错机制来决定的。
SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、ZeroMQ和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。
Spark Streaming是将流式计算分解成一系列短小的批处理作业,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据DStream(Discretized-离散化 Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformations操作变为针对Spark中对RDD的Transformations操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。
对DStream的处理,每个DStream都要按照数据流到达的先后顺序依次进行处理。即SparkStreaming天然确保了数据处理的顺序性。
这样使所有的批处理具有了一个顺序的特性,其本质是转换成RDD的血缘关系。所以,SparkStreaming对数据天然具有容错性保证。
为了提高SparkStreaming的工作效率,你应该合理的配置批的时间间隔, 最好能够实现上一个批处理完某个算子,下一个批刚好到来。
准备监听目录以及数据:
进入spark目录,启动spark:
cd /home/software/spark-2.0.1-bin-hadoop2.7/bin/
sh spark-shell --master=local[2]
代码如下:scala> import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc,Seconds(5));scala> val lines = ssc.textFileStream("file:///home/streaming")
scala> lines.print()
scala> ssc.start()将home下的01.txt拷贝到streaming目录下
此时可以看到监听到了
在home下创建文件02.txt 和03.txt
启动spark:
import org.apache.spark.streaming._
val ssc=new StreamingContext(sc,Seconds(5))val source=ssc.textFileStream("file:///home/streaming")
val result=source.filter{line=>line.split(" ")(1)=="M"}
result.print()ssc.start()
然后将02.txt拷贝到streaming目录下
此时可以发现过滤掉了男性数据
③、实时单词词频统计
进入spark:
sh spark-shell --master=local[2]
代码:
import org.apache.spark.streaming._
val ssc=new StreamingContext(sc,Seconds(5))val source=ssc.textFileStream("file:///home/streaming")
val wordcount=source.flatMap{line=>line.split(" ")}.map{word=>(word,1)}.reduceByKey{_+_}
wordcount.print()
ssc.start()
准备数据
可以发现监听到结果:
底层实现流程:
StreamingContext是Spark Streaming编程的最基本环境对象,就像Spark编程中的SparkContext一样。StreamingContext提供最基本的功能入口,包括从各途径创建最基本的对象DStream(就像Spark编程中的RDD)。
创建StreamingContext的方法很简单,生成一个SparkConf实例,设置程序名,指定运行周期(示例中是5秒),这样就可以了:
val conf = new SparkConf().setAppName("SparkStreamingWordCount")
val sc=new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
运行周期为5秒,表示流式计算每间隔5秒执行一次。这个时间的设置需要综合考虑程序的延时需求和集群的工作负载,应该大于每次的运行时间。
StreamingContext还可以从一个现存的org.apache.spark.SparkContext创建而来,并保持关联,比如上面示例中的创建方法:
val ssc = new StreamingContext(sc, Seconds(5))
StreamingContext创建好之后,还需要下面这几步来实现一个完整的Spark流式计算:
(1)创建一个输入DStream,用于接收数据;
(2)使用作用于DStream上的Transformation和Output操作来定义流式计算(Spark程序是使用Transformation和Action操作);
(3)启动计算,使用streamingContext.start();
(4)等待计算结束(人为或错误),使用streamingContext.awaitTermination();
(5)也可以手工结束计算,使用streamingContext.stop()。
DStream(discretized stream)是Spark Streaming的核心抽象,类似于RDD在Spark编程中的地位。DStream表示连续的数据流,要么是从数据源接收到的输入数据流,要求是经过计算产生的新数据流。DStream的内部是一个RDD序列,每个RDD对应一个计算周期。比如,在上面的WordCount示例中,每5秒一个周期,那么每5秒的数据都分别对应一个RDD,如图所示,图中的时间点1、2、3、4代表连续的时间周期。
所有应用在DStream上的操作,都会被映射为对DStream内部的RDD上的操作,比如上面的WordCount示例中对lines DStream的flatMap操作,如下图
RDD操作将由Spark核心来调度执行,但DStream屏蔽了这些细节,给开发者更简洁的编程体验。
当然,我们也可以直接对DStream内部的RDD进行操作
①、SparkStreaming实现历史批次的累积处理
SparkStreaming提供了checkPoint机制,首先需要设置一个检查点目录,在这个目录,存储了历史周期数据。通过在临时文件中存储中间数据 为历史数据累计处理提供了可能性
进入spark:
import org.apache.spark.streaming._
val ssc=new StreamingContext(sc,Seconds(5))ssc.checkpoint("file:///home/check")
val source=ssc.textFileStream("file:///home/streaming")
val r1=source.flatMap{line=>line.split(" ")}.map{word=>(word,1)}
val r2=r1.updateStateByKey{(seq,op:Option[Int])=>Some(seq.sum+op.getOrElse(0))}
r2.print()
ssc.start()
我们将04.txt拷贝到streaming目录
会发现已经监听到
我们再创建一个05.txt,拷贝到streaming目录
我们发现已经累加了
updateStateByKey:用于实现历史批次的累积处理方法,要求操作的数据类型必须为DStream[(key,value)]
知识点:
1、scala的Option有两个子类,分别为Some和None
2、对于Option类型的取值,可以通过getOrElse(默认值)来进行取值。比如
Some(4).getOrElse(0)=4
None.getOrElse(0)=0
②、SparkStreaming滑动窗口机制
上面例子实现效果是:只要不停止SparkStreaming,会不停将所有的历史批次数据累积到一起处理。但有时需要的并不是这种效果,我们希望能够每隔一段时间重新统计下一段时间的数据,比如每隔一段时间,重新计算一下历史批次数据,而不是一致不停的累积处理。这里就需要用到滑动窗口机制。
在DStream中提供了如下的和滑动窗口相关的方法:、
window(windowLength, slideInterval)
windowLength:窗口长度
slideInterval:滑动区间
需要设定:
1、batch size 批大小,比如=2s
2、window size 窗口大小,比如=8s
3、slide size 滑动大小,比如=2s
相当于每隔2s,重新计算一个窗口(一个窗口中就包含了历史批次数据)的数据
注意:
1、窗口大小和滑动大小必须是批大小的整数倍
2、滑动大小一般小于窗口大小,也可以大于窗口大小,但会造成数据丢失的情况
3、设计历史批次数据的处理,所以需要指定checkpoint目录,用于存储历史批次数据
代码:
进入spark:
import org.apache.spark.streaming._
val ssc=new StreamingContext(sc,Seconds(5))ssc.checkpoint("file:///home/check-window")
val source=ssc.textFileStream("file:///home/streaming")
val r1=source.flatMap{line=>line.split(" ")}.map{word=>(word,1)}
val r2=r1.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(15),Seconds(10))
r2.print()
ssc.start()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。