赞
踩
本站已停止更新,查看最新内容请移至本人博客 Wilen’s Blog
Spark安装配置
Spark基本概念
Spark基础知识(PySpark版)
Spark机器学习(PySpark版)
Spark流数据处理(PySpark版)
Hadoop的MapReduce及Spark SQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐、实时网站性能分析等,流式计算可以解决这些问题。目前有三种比较常用的流式计算框架,它们分别是Twitter Storm,Spark Streaming和Samza。
Spark Streaming用于进行实时流数据的处理,它具有高扩展、高吞吐率及容错机制。
如下图所示,Spark Streaming 把流式计算当做一系列连续的小规模批处理(batch)来对待。Spark Streaming 接收输入数据流,并在内部将数据流按均匀的时间间隔分为多个较小的batch。然后再将这部分数据交由Spark引擎进行处理,处理完成后将结果输出到外部文件。
Spark Streaming的主要抽象是离散流(DStream),它代表了前面提到的构成数据流的那些batch。DStream可以看作是多个有序的RDD组成,因此它也只通过map, reduce, join and window等操作便可完成实时数据处理。,另外一个非常重要的点便是,Spark Streaming可以与Spark MLlib、Graphx等结合起来使用,功能十分强大,似乎无所不能。
目前,围绕Spark Streaming有四种广泛的场景:
下图提供了Spark driver、workers、streaming源与目标间的数据流:
Spark Streaming内置了一系列receiver,可以接收很多来源的数据,最常见的是Apache Kafka、Flume、HDFS/S3、Kinesis和Twitter。
Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。
编写Spark Streaming程序的基本步骤是:
下面我们使用Python的Spark Streaming来创建一个简单的单词计数例子。
这个字数计数示例将使用Linux/Unix nc命令——它是一种读写跨网络连接数据的简单实用程序。我们将使用两个不同的bash终端,一个使用nc命令将多个单词发送到我们计算机的本地端口(9999),另一个终端将运行Spark Streaming来接收这些字,并对它们进行计数。
#!/usr/bin/env python3 # Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark.streaming import StreamingContext import sys # Create sc with two working threads sc = SparkContext('local[2]','NetworkWordCount') # Spark Streaming入口点(每隔一秒钟运行一次微批次) ssc = StreamingContext(sc, 1) # 创建DStream输入源:套接字流 lines = ssc.socketTextStream(sys.argv[1], sys.argv[2]) # Split lines into words and count wordCounts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) wordCounts.pprint() # 启动Spark Streaming,并等待终止命令 ssc.start() ssc.awaitTermination()
如前所述,现在有了脚本,打开两个终端窗口:一个用于您的nc命令,另一个用于Spark Streaming程序。
要从其中一个终端启动nc命令,请键入:
$ nc -lk 9999
从这个点开始,你在这个终端所输入的一切都将被传送到9999端口。本例中,敲入green
这个词三次,blue
五次。
从另一个终端屏幕,我们来运行刚创建的Python流脚本(NetworkWordCount.py)。
$ spark-submit NetworkWordCount.py localhost 9999
该命令将运行脚本,读取本地计算机(即localhost)端口9999以接收发送到该套接字的任何内容。由于你已经在第一个终端将信息发送端口,因此在启动脚本后不久,Spark Streaming程序会读取发送到端口9999的任何单词,并按照以下屏幕截图中所示的样子执行单词计数:
$ nc -lk 9999
green green green blue blue blue blue blue
-------------------------------------------
Time: 2018-12-24 11:30:26
-------------------------------------------
-------------------------------------------
Time: 2018-12-24 11:30:27
-------------------------------------------
('blue', 5)
('green', 3)
文件流:包括文本格式和任意HDFS的输入格式。创建DStream输入源示例
lines = ssc.textFileStream('wordfile')
套接字流 (socket):从一个本地或远程主机的某个端口服务上读取数据。它无法提供端到端的容错保障,Socket源一般仅用于测试或学习用途。
创建DStream输入源示例
lines = ssc.socketTextStream("local", 9999)
RDD序列流:在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream
kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统、批处理系统等),可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换。
下图为kafka组成
我们可以创建基于Kafka的DStream
from pyspark.streaming.kafka import KafkaUtils
kvs = KafkaUtils.createStream(...)
无状态转化操作:把简单的RDDtransformation分别应用到每个批次上,每个批次的处理不依赖于之前的批次的数据。
有状态转化操作:需要使用之前批次的数据或者中间结果来计算当前批次的数据。包括基于滑动窗口的转化操作,和追踪状态变化的转化操作(updateStateByKey)
无状态转化操作 | 说明(同RDD转化类似) |
---|---|
map(func) | 映射变换 |
flatMap(func) | 同RDD |
filter(func) | 返回过滤后新的DStream |
reduce(func) | 聚合 |
count() | 计数 |
union(otherStream) | 合并 |
countByValue() | 值计数 |
reduceByKey(func, [numTasks]) | 对于相同key的数据聚合 |
join(otherStream, [numTasks]) | 交集 |
cogroup(otherStream, [numTasks]) | |
transform(func) | 任意变换 |
repartition(numPartitions) | 重分区 |
滑动窗口转化操作
window(windowLength, slideInterval)
基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream
countByWindow(windowLength, slideInterval)
返回流中元素的一个滑动窗口数
reduceByWindow(func, windowLength, slideInterval)
返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算
countByValueAndWindow(windowLength, slideInterval, [numTasks])
当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率
reduceByKeyAndWindow方法
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKeyAndWindow(lambda x,y:x+y, lambda x,y:x-y, 30, 10)
counts.pprint()
UpdateStateByKey转化方法:需要在跨批次之间维护状态时,需要UpdateStateByKey方法。通俗点说,假如我们想知道一个用户最近访问的10个页面是什么,可以把键设置为用户ID,然后UpdateStateByKey就可以跟踪每个用户最近访问的10个页面,这个列表就是“状态”对象。
回到本章初的应用案例(无状态转化),1秒在nc端键入3个green和5个blue,2秒再键入1个gohawks,4秒再键入2个green。
下图展示了lines DStream及其微批量数据:
下图表示我们计算的是有状态的全局聚合:
代码如下
#!/usr/bin/env python3 # Create a local SparkContext and Streaming Contexts from pyspark import SparkContext from pyspark.streaming import StreamingContext import sys # Create sc with two working threads sc = SparkContext('local[2]','NetworkWordCount') # Spark Streaming入口点(每隔一秒钟运行一次微批次) ssc = StreamingContext(sc, 1) # 为了确保持续运行可以容错,配置一个检查点 ssc.checkpoint("checkpoint") # 创建DStream输入源:套接字流 lines = ssc.socketTextStream(sys.argv[1], sys.argv[2]) # 定义更新函数:sum of the (key, value) pairs def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([]) running_counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .updateStateByKey(updateFunc, initialRDD=initialStateRDD) running_counts.pprint() ssc.start() ssc.awaitTermination()
两者的主要区别在于使用了updateStateByKey方法,该方法将执行前面提到的执行加和的updateFunc。updateStateByKey是Spark Streaming的方法,用于对数据流执行计算,并以有利于性能的方式更新每个key的状态。通常在Spark 1.5及更早版本中使用updateStateByKey,这些有状态的全局聚合的性能与状态的大小成比例,从Spark 1.6起,应该使用mapWithState。
对于Spark 2.0,Apache Spark社区致力于通过引入结构化流(structured streaming)的概念来简化流,结构化流将Streaming概念与Dataset/DataFrame相结合。结构化流式引入的是增量,当处理一系列数据块时,结构化流不断地将执行计划应用在所接收的每个新数据块集合上。通过这种运行方式,引擎可以充分利用Spark DataFrame/Dataset所包含的优化功能,并将其应用于传入的数据流。
编写Structured Streaming程序的基本步骤包括:
我们来看一下使用updateStateByKey的有状态流的文字计数脚本,并将其改成一个Structured Streaming的文字计数脚本:
#!/usr/bin/env python3 # Import necessary classes and create a local SparkSession from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() # from connection to localhost: 9999 lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # split lines into words words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # Generate runing word count wordCounts = words.groupBy("word").count() # 输出至控制台 query = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="8 seconds") \ .start() query.awaitTermination()
取而代之的,流那部分的代码是通过调用readStream来初始化的,我们可以使用熟悉的DataFrame groupBy语句和count来生成运行的文字计数。
由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。
让我们回到第一个终端运行我们的nc作业:
$ nc -lk 9999
检查以下输出。如你所见,你既能得到有状态流的优势,还能使用更为熟悉的DataFrame API:
------------------------------------------- Batch: 0 ------------------------------------------- +----+-----+ |word|count| +----+-----+ | cat| 1| | dog| 3| +----+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +----+-----+ |word|count| +----+-----+ | cat| 2| | dog| 3| | owl| 1| +----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +----+-----+ |word|count| +----+-----+ | cat| 2| | dog| 4| | owl| 2| +----+-----+
数据流:通过调用readStream来初始化。支持的格式包括文件流(csv、json、orc、parquet、text)、Kafka、套接字流(socket)、Rate源等。
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", 'wordcount-topic') \
.load()
输出:DataFrame/Dataset的.writeStream()方法将会返回DataStreamWriter接口,接口通过.start()真正启动流计算,并将DataFrame/Dataset写入到外部的输出接收器,DataStreamWriter接口有以下几个主要函数:
输出模式用于指定写入接收器的内容,主要有以下几种:
query = windowedCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.option('truncate', 'false') \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
参考链接:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。