当前位置:   article > 正文

大数据程序员必会之Spark框架上的实时流计算框架SparkStreaming_spark 实时计算

spark 实时计算

Spark Streaming

如今在大数据的世界里,Spark可谓是众所周知,风光无限了。在批处理领域取得巨大成功后,Spark开始向流计算领域进军,于是诞生了Spark Streaming。Spark Streaming是建立在

,提供了可扩展、高吞吐和错误容忍的实时数据流处理功能。

系统架构

图6-2描述了Spark Streaming的工作原理。Spark Streaming构建在Spark平台上,充分利用了Spark的核心处理引擎。Spark Streaming将接收的实时数据流分成一个个的RDD,然后由Spark引擎对RDD做各种处理,其中每个RDD实际是一个小的块数据。所以,Spark Streaming本质上是将流数据分成一段段块数据后,对其进行连续不断的批处理。

图6-2 Spark Streaming将流数据切分为块数据后进行处理

流的描述

对于流计算过程的描述,Sparking Streamingg包含以下核心概念。

·RDD:Spark引擎的核心概念,代表一个数据集合,是Spark进行数据处理的计算单元。

·DStream:Spark Streaming对流的抽象,代表连续数据流。在系统内部,DStream由一系列的RDD构成,每个RDD代表一段间隔内的数据。

·Transformation:代表Spark Streaming对DStream的处理逻辑。目前,DStream提供了很多与Transformation相关的API,包括map、flatMap、filter、reduce、union、join、transform和updateStateByKey等。通过这些API,可以对DStream做各种转换,从而将一个数据流变为另一个数据流。

·Outpu.Operations:Spark Streaming将DStream输出到控制台、数据库或文件系统等外部系统中的操作。目前,DStream支持的output Operations包括print、saveAsTextFiles、saveAsObjectFiles、saveAsHadoopFiles和foreachRDD。由于这些操作会触发外部系统访问,所以DStream各种转化的执行实际上由这些操作触发。

流的执行

与Storm类似,我们从流的输入、流的处理、流的输出和反向压力4个方面来讨论Spark Streaming中流的执行过程。

1.流的输入

Spark Streaming提供了3种创建输入数据流的方式。

·基础数据源。通过StreamingContext的相关API,直接构建输入数据流。这类API通常从Socket、文件或内存中构建输入数据流,如socketTextStream、textFileStream、queueStream等。

·高级数据源。通过外部工具类从Kafka、Flume、Kinesis等消息中间件或消息源构建输入数据流。

·自定义数据源。当用户实现了
org.apache.spark.streaming.receiver抽象类时,就可以实现一个自定义数据源了。

Spark Streaming用DStream来表示数据流,所以输入数据流也表示为DStream。下面的示例演示了从TCP连接中构建文本数据输入流的过程。

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCount

Example");

JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

2.流的处理

Spark Streaming对流的处理是通过DStream的各种转化操作API完成的。DStream的转换操作大体上也包含3类操作。第一类是常用的流式处理操作,如map、filter、reduce、count、transform等。第二类是流数据状态相关的操作,如union、join、cogroup、window等。第三类是流信息状态相关的操作,目前有updateStateByKey和mapWithState。

下面是一个对DStream进行转化操作的例子。

// 将每一行分割成单词,然后统计单词出现次数

JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("

")).iterator());

JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 +

i2);;

在上面的例子中,先从Socket中读出文本流lines,对每行文本分词后,用flatMap转化为单词流words;然后用mapToPair将单词流words转化为计数元组流pairs;最后,以单词为分组进行数量统计,通过reduceByKey转化为单词计数流wordCounts。

3.流的输出

Spark Streaming允许DStream输出到外部系统,这是通过DStream的各种输出操作完成的。DStream的输出操作可以将数据输出到控制台、文件系统或数据库等。目前DStream的输出操作有print、saveAsTextFiles、saveAsHadoopFiles和foreachRDD等。其中,foreachRDD是一个通用的DStream输出接口,用户可以通过foreachRDD自定义各种Spar.Streaming输出方式。下面的例子演示了将单词计数流输出到控制台。

wordCounts.print();

4.反向压力

早期版本的Spark不支持反向压力,但从Spar.1.5版本开始,Spark Streaming引入了反向压力功能。默认情况下,SparkStreaming的反向压力功能是关闭的。当要使用反向压力功能时,需要将
spark.streaming.backpressure.enabled设置为True。

整体而言,Spark的反向压力功能借鉴了工业控制中PID控制器的思路,其工作原理如下。首先,当Spark处理完每批数据时,统计每批数据的处理结束时间、处理时延、等待时延、处理消息数等信息。然后,Spark根据统计信息估计处理速度,并将这个估计值通知给数据生产者。最后,数据生产者根据估计的处理速度,动态调整生产速度,最终使得生产速度与处理速度相匹配。

流的状态

在Spark Streaming中,流的状态管理是在部分DStream提供的转化操作中实现的。

在流数据状态方面,由于DStream本身将数据流分成RDD做批处理,所以Spark Streaming天然就需要对数据进行缓存和状态管理。换言之,组成DStream的一个个RDD就是一种流数据状态。在DStream上,提供了一些窗口相关的转化API,实现对流数据的窗口管理。在窗口之上还提供了count和reduce两类聚合功能。另外,DStream还提供了union、join和cogroup 3种在多个流之间做关联操作的API。

在流信息状态方面,DStream的updateStateByKey操作和mapWithState操作提供了流信息状态管理的方法。updateStateByKey和mapWithState都可以基于key来记录历史信息,并在新的数据到来时对这些信息进行更新。不同的是,updateStateByKey会返回记录的所有历史信息,而mapWithState只会返回处理当前一批数据时更新的信息。

就好像,前者返回了一个完整的直方图,而后者只是返回直方图中发生变化的柱条。由此可见,mapWithState比updateStateByKey的性能优越很多。从功能上讲,如果不是用于报表生成的场景,大多数实时流计算应用使用mapWithState会更合适。

消息传达可靠性保证

Spark Streaming对消息可靠性的保证是由数据接收、数据处理和数据输出共同决定的。从1.2版本开始,Spark引入WAL(Write AheadLogs)机制,可以将接收的数据先保存到错误容忍的存储空间。当开启WAL机制后,再配合可靠的数据接收器(如Kafka),SparkStreaming能够实现“至少一次”的消息接收功能。从1.3版本开始,Spark又引入了Kafka Direct API,进而可以实现“精确一次”的消息接收功能。由于Spark Streaming对数据的处理是基于RDD完成的,而RDD提供了“精确一次”的消息处理功能,所以在数据处理部分,Spark Streaming天然具有“精确一次”的消息可靠性保证机制。

但是,Spark Streaming的数据输出部分目前只具有“至少一次”的可靠性保证机制。也就是说,经过处理的数据可能会被多次输出到外部系统。在一些场景下,这么做不会有什么问题。例如,输出数据被保存到文件系统,重复发送的结果只是覆盖之前写过一遍的数据。

但是在另一些场景下,如需要根据输出增量更新数据库,那就需要做一些额外的去重处理了。一种可行的方法是,在各个RDD中新增一个唯一标识符来表示这批数据,然后在写入数据库时,使用这个唯一标识符来检查数据之前是否写入过。当然,这时写入数据库的动作需要使用事务来保证一致性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/906418
推荐阅读
相关标签
  

闽ICP备14008679号