当前位置:   article > 正文

Spark Streaming介绍以及案例_pysaprk用sparkstreaming具体scv文件案例分析

pysaprk用sparkstreaming具体scv文件案例分析

概观

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。

数据来源:Kafka,Flume,Kinesis或TCP套接字等,

可以使用高级函数进行复杂算法进行处理map,例如reducejoinwindow

处理后的数据可以推送到文件系统,数据库等

 

Spark Streaming

它的工作原理:

Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理以批量生成最终结果流

Spark Streaming

Spark Streaming提供称为离散流DStream的高级抽象,表示连续的数据流。DStream可以从来自Kafka,Flume和Kinesis等源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列 RDD。

案例介绍

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}
  3. object StreamDemo {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setMaster("local[2]").setAppName("MyTest")
  6. val ssc = new StreamingContext(conf,Seconds(1))
  7. //创建一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。
  8. //此linesDStream表示将从数据服务器接收的数据流。DStream中的每条记录都是一行文本
  9. val lines = ssc.socketTextStream("localhost",9999)
  10. //flatMap是一对多DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每行将被分成多个单词,单词流表示为wordsDStream。
  11. val words = lines.flatMap(_.split(" "))
  12. val pairs = words.map(word => (word,1))
  13. val wordCount = pairs.reduceByKey(_+_)
  14. wordCount.print()
  15. //Spark Streaming仅设置启动时将执行的计算,并且尚未启动实际处理。要在设置完所有转换后开始处理
  16. ssc.start()
  17. ssc.awaitTermination()
  18. }
  19. }

使用端口发送数据:

nc -lk 9999

查看端口使用情况:

lsof -i:9999

 

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号