赞
踩
spark streaming可以接收实时的输入数据流(如上图的kafka、HDFS、TCP socket的数据流等)的高吞吐量、容错性良好的数据处理,然后将处理完的数据推送到HDFS文件系统、数据库或dashboards上。在spark streaming中可以使用机器学习或者图处理算法。spark streaming的大致处理过程如下:
对于像在交易系统上,对流式数据进行实时计算,现在主流的流计算工具有三种:
Storm的延迟最低,一般为几毫秒到几十毫秒,但数据吞吐量较低,每秒能够处理的事件在几十万左右,建设成本高。
Flink是目前国内互联网厂商主要使用的流计算工具,延迟一般在几十到几百毫秒,数据吞吐量非常高,每秒能处理的事件可以达到几百上千万,建设成本低。
Spark通过【Spark Streaming】或【Spark Structured Streaming】支持流计算。
批计算或批处理是处理离线数据。单个处理数据量大,处理速度比较慢。
流计算是处理在线实时产生的数据。单次处理的数据量小,但处理速度更快。
Spark在2.0之前,主要使用的Spark Streaming来支持流计算,其数据结构模型为DStream
,其实就是一个个小批次数据构成的RDD队列。
目前,Spark主要推荐的流计算模块是Structured Streaming,其数据结构模型是Unbounded DataFrame
,即没有边界的数据表。
场景:在数据服务器中,基于tcp socket接收到的数据中统计文本数。
(1)创建StreamingContext。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
(2)创建DStream,指定localhost的ip和port(即确定socket)。
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
DStream的每条记录都是一行文本,现在需要根据句子中每个单词之间的空格,分隔成一个个单词:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
以上只是transformation操作,接下来是action部分:
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
完整代码如下:
r""" Counts words in UTF8 encoded, '\n' delimited text received from the network every second. Usage: network_wordcount.py <hostname> <port> <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. To run this on your local machine, you need to first run a Netcat server `$ nc -lk 9999` and then run the example `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999` """ import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) sys.exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount") # 创建StreamingContext ssc = StreamingContext(sc, 1) # 创建DStream,确定localhost和port lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\ .reduceByKey(lambda a, b: a+b) counts.pprint() # action部分 ssc.start() ssc.awaitTermination()
PS:跑上面代码前需要运行Netcat数据服务器,可以通过yum install -y nc
下载Netcat(一个强大的网络工具);可以通过nc -lk 9999
测试使用,然后新开一个terminal,nc ip:9999
。
filter
筛选行不包含Grass的文本flatmap
对文本行进行拆分[1] https://spark.apache.org/docs/latest/streaming-programming-guide.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。