赞
踩
目录
1、利用Spark Streaming对三种类型的基本数据源的数据进行处理
2、利用Spark Streaming对Kafka高级数据源的数据进行处理
4、把DStream的数据输出保存到文本文件或MySQL数据库中
1、通过实验掌握Spark Streaming的基本编程方法;
2、熟悉利用Spark Streaming处理来自不同数据源的数据。
3、熟悉DStream的各种转换操作。
4、熟悉把DStream的数据输出保存到文本文件或MySQL数据库中。
1、参照教材示例,利用Spark Streaming对三种类型的基本数据源的数据进行处理。
2、参照教材示例,完成kafka集群的配置,利用Spark Streaming对Kafka高级数据源的数据进行处理,注意topic为你的姓名全拼。
3、参照教材示例,完成DStream的两种有状态转换操作。
4、参照教材示例,完成把DStream的数据输出保存到文本文件或MySQL数据库中。
(1)文件流
首先打开第一个终端作为数据流终端,创建一个logfile目录:
- [root@bigdata zhc]# cd /home/zhc/mycode/sparkstreaming
- [root@bigdata sparkstreaming]# mkdir logfile
- [root@bigdata sparkstreaming]# cd logfile
然后打开第二个终端作为流计算终端,在“/logfile/”目录下面新建一个py程序:
[root@bigdata logfile]# vim FileStreaming.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/logfile/FileStreaming.py
-
- from pyspark import SparkContext, SparkConf
- from pyspark.streaming import StreamingContext
-
- conf = SparkConf()
- conf.setAppName('TestDStream')
- conf.setMaster('local[2]')
- sc = SparkContext(conf = conf)
- ssc = StreamingContext(sc, 10)
- lines = ssc.textFileStream('file:///home/zhc/mycode/sparkstreaming/logfile')
- words = lines.flatMap(lambda line: line.split(' '))
- wordCounts = words.map(lambda x : (x,1)).reduceByKey(lambda a,b:a+b)
- wordCounts.pprint()
- ssc.start()
- ssc.awaitTermination()
保存该文件并执行如下命令:
[root@bigdata logfile]# spark-submit FileStreaming.py
然后我们进入数据流终端,在logfile目录下新建一个log2.txt文件,然后往里面输入一些英文语句后保存退出,再次切换到流计算终端,就可以看见打印出单词统计信息了。
(2)套接字流
1)使用套接字流作为数据源
继续在流计算端的sparkstreaming目录下创建一个socket目录,然后在该目录下创建一个NetworkWordCount.py程序:
- [root@bigdata sparkstreaming]# mkdir socket
- [root@bigdata sparkstreaming]# cd socket
- [root@bigdata socket]# vim NetworkWordCount.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/socket/NetworkWordCount.py
-
- from __future__ import print_function
- import sys
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
-
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
- exit(-1)
- sc = SparkContext(appName="PythonStreamingNetworkWordCount")
- ssc = StreamingContext(sc, 5)
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
- counts.pprint()
- ssc.start()
- ssc.awaitTermination()
再在数据流终端启动Socket服务器端:
[root@bigdata logfile]# nc -lk 9999
然后再进入流计算终端,执行如下代码启动流计算:
[root@bigdata socket]# spark-submit NetworkWordCount.py localhost 9999
然后在数据流终端内手动输入一行英文句子后回车,多输入几次,流计算终端就会不断执行词频统计并打印出信息。
2)使用Socket编程实现自定义数据源
下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。在数据流终端执行以下命令,编写DataSourceSocket.py文件:
- [root@bigdata logfile]# cd /home/zhc/mycode/sparkstreaming/socket
- [root@bigdata socket]# vim DataSourceSocket.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/socket/DataSourceSocket.py
- import socket
- # 生成socket对象
- server = socket.socket()
- # 绑定ip和端口
- server.bind(('localhost', 9999))
- # 监听绑定的端口
- server.listen(1)
- while 1:
- # 为了方便识别,打印一个“我在等待”
- print("I'm waiting the connect...")
- # 这里用两个值接受,因为连接上之后使用的是客户端发来请求的这个实例
- # 所以下面的传输要使用conn实例操作
- conn,addr = server.accept()
- # 打印连接成功
- print("Connect success! Connection is from %s " % addr[0])
- # 打印正在发送数据
- print('Sending data...')
- conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
- conn.close()
- print('Connection is broken.')
继续在数据流终端执行如下命令启动Socket服务端:
[root@bigdata socket]# spark-submit DataSourceSocket.py
再进入流计算终端,执行如下代码启动流计算:
[root@bigdata socket]# spark-submit NetworkWordCount.py localhost 9999
(3)RDD队列流
继续在sparkstreaming目录下新建rddqueue目录并在该目录下创建RDDQueueStream.py程序:
- [root@bigdata sparkstreaming]# mkdir rddqueue
- [root@bigdata sparkstreaming]# cd rddqueue
- [root@bigdata rddqueue]# vim RDDQueueStream.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/rddqueue/RDDQueueStreaming.py
- import time
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- if __name__ == "__main__":
- sc = SparkContext(appName="PythonStreamingQueueStream")
- ssc = StreamingContext(sc, 2)
- #创建一个队列,通过该队列可以把RDD推给一个RDD队列流
- rddQueue = []
- for i in range(5):
- rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
- time.sleep(1)
- #创建一个RDD队列流
- inputStream = ssc.queueStream(rddQueue)
- mappedStream = inputStream.map(lambda x: (x % 10, 1))
- reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
- reducedStream.pprint()
- ssc.start()
- ssc.stop(stopSparkContext=True, stopGraceFully=True)
保存退出后,进入流计算终端再执行如下命令:
[root@bigdata rddqueue]# spark-submit RDDQueueStream.py
此过程可以参照这篇博客的第四、五部分内容:
说明:上面的词频统计程序NetworkWordCount.py采取了无状态转换操作。
(1)滑动窗口转换操作
在socket目录下创建WindowedNetworkWordCount.py程序并输入如下代码:
- #/home/zhc/mycode/sparkstreaming/socket/WindowedNetworkWordCount.py
- from __future__ import print_function
- import sys
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
- exit(-1)
- sc = SparkContext(appName="PythonStreamingWindowedNetworkWordCount")
- ssc = StreamingContext(sc, 10)
- ssc.checkpoint("file:///home/zhc/mycode/sparkstreaming/socket/checkpoint")
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
- counts.pprint()
- ssc.start()
- ssc.awaitTermination()
然后在数据流终端执执行如下命令运行nc程序:
- [root@bigdata sparkstreaming]# cd /home/zhc/mycode/sparkstreaming/socket
- [root@bigdata socket]# nc -lk 9999
然后再在流计算终端运行WindowedNetworkWordCount.py代码:
[root@bigdata socket]# spark-submit WindowedNetworkWordCount.py localhost 9999
这时,可以查看流计算终端内显示的词频动态统计结果,可以看到,随着时间的流逝,词频统计结果会发生动态变化。
(2)updateStateByKey操作
在“/home/zhc/mycode/sparkstreaming/”路径下新建目录“/stateful”,并在该目录下新建代码文件NetworkWordCountStateful.py。
- [root@bigdata sparkstreaming]# mkdir stateful
- [root@bigdata sparkstreaming]# cd stateful
- [root@bigdata stateful]# vim NetworkWordCountStateful.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStateful.py
- from __future__ import print_function
- import sys
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
- exit(-1)
- sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
- ssc = StreamingContext(sc, 1)
- ssc.checkpoint("file:///home/zhc/mycode/sparkstreaming/stateful/")
- # RDD with initial state (key, value) pairs
- initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
- def updateFunc(new_values, last_sum):
- return sum(new_values) + (last_sum or 0)
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDD=initialStateRDD)
- running_counts.pprint()
- ssc.start()
- ssc.awaitTermination()
在“数据源终端”,执行如下命令启动nc程序:
[root@bigdata stateful]# nc -lk 9999
在“流计算终端”,执行如下命令提交运行程序:
[root@bigdata stateful]# spark-submit NetworkWordCountStateful.py localhost 9999
在数据源终端内手动输入一些单词并回车,再切换到流计算终端,可以看到已经输出了类似如下的词频统计信息:
(1)把DStream输出到文本文件中
在stateful目录下新建NetworkWordCountStatefulText.py文件:
[root@bigdata stateful]# vim NetworkWordCountStatefulText.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStatefulText.py
- from __future__ import print_function
- import sys
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
- exit(-1)
- sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
- ssc = StreamingContext(sc, 1)
- ssc.checkpoint("file:///home/zhc/mycode/sparkstreaming/stateful/statefultext")
- # RDD with initial state (key, value) pairs
- initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
- def updateFunc(new_values, last_sum):
- return sum(new_values) + (last_sum or 0)
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDD=initialStateRDD)
- running_counts.saveAsTextFiles("file:///home/zhc/mycode/sparkstreaming/stateful/statefultext/output")
- running_counts.pprint()
- ssc.start()
- ssc.awaitTermination()
在“数据源终端”,执行如下命令启动nc程序:
[root@bigdata stateful]# nc -lk 9999
在“流计算终端”,执行如下命令提交运行程序:
[root@bigdata stateful]# spark-submit NetworkWordCountStatefulText.py localhost 9999
在数据源终端内手动输入一些单词并回车,再切换到流计算终端,可以看到已经输出了类似如下的词频统计信息:
在“/home/zhc/mycode/sparkstreaming/stateful/statefultext”目录下便可查看到如下输出目录结果:
进入某个目录下,就可以看到类似part-00000的文件,里面包含了流计算过程的输出结果。
(2)把DStream写入到MySQL数据库中
首先启动MySQL数据库:
- [root@bigdata stateful]# systemctl start mysqld.service
- [root@bigdata stateful]# mysql -u root -p
然后创建spark数据库和wordcount表:
- mysql> use spark;
- mysql> create table wordcount (word char(20), count int(4));
然后再在终端安装python连接MySQL的模块:
[root@bigdata stateful]# pip3 install PyMySQL
在stateful目录并在该目录下创建NetworkWordCountStatefulDB.py文件:
[root@bigdata stateful]# vim NetworkWordCountStatefulDB.py
输入如下代码:
- #/home/zhc/mycode/sparkstreaming/stateful/NetworkWordCountStatefulDB.py
- from __future__ import print_function
- import sys
- import pymysql
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: NetworkWordCountStateful <hostname> <port>", file=sys.stderr)
- exit(-1)
- sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
- ssc = StreamingContext(sc, 1)
- ssc.checkpoint("file:///home/zhc/mycode/sparkstreaming/stateful/statefuldb")
- # RDD with initial state (key, value) pairs
- initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
- def updateFunc(new_values, last_sum):
- return sum(new_values) + (last_sum or 0)
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- running_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).updateStateByKey(updateFunc, initialRDD=initialStateRDD)
- running_counts.pprint()
- def dbfunc(records):
- db = pymysql.connect(host="localhost",user="root",password="MYsql123!",database="spark")
- cursor = db.cursor()
- def doinsert(p):
- sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))
- try:
- cursor.execute(sql)
- db.commit()
- except:
- db.rollback()
- for item in records:
- doinsert(item)
- def func(rdd):
- repartitionedRDD = rdd.repartition(3)
- repartitionedRDD.foreachPartition(dbfunc)
- running_counts.foreachRDD(func)
- ssc.start()
- ssc.awaitTermination()
在“数据源终端”,执行如下命令启动nc程序:
[root@bigdata stateful]# nc -lk 9999
在“流计算终端”,执行如下命令提交运行程序:
[root@bigdata stateful]# spark-submit NetworkWordCountStatefulDB.py localhost 9999
在数据源终端内手动输入一些单词并回车,再切换到流计算终端,可以看到已经输出了类似如下的词频统计信息:
到MySQL终端便可以查看wordcount表中的内容:
mysql> select * from wordcount;
.......
Spark Streaming是一个用于实时数据处理的流式计算框架,它基于 Apache Spark 平台,提供了高可靠性、高吞吐量和容错性强等特点。在进行 Spark Streaming 编程的实验中,掌握了Spark Streaming的基本编程方法;能够利用Spark Streaming处理来自不同数据源的数据以及DStream的各种转换操作;把DStream的数据输出保存到文本文件或MySQL数据库中。
理解DStream:DStream 是 Spark Streaming 的核心概念,代表连续的数据流。在编程时,我们可以通过输入源(比如 Kafka、Flume、HDFS)创建一个 DStream 对象,并对其进行转换和操作。需要注意的是,DStream 是以时间片为单位组织数据的,因此在编写代码时要考虑时间窗口的大小和滑动间隔。
适当设置批处理时间间隔:批处理时间间隔决定了 Spark Streaming 处理数据的粒度,过小的时间间隔可能导致频繁的任务调度和资源开销,而过大的时间间隔则可能造成数据处理延迟。因此,在实验中需要根据具体场景和需求来选择合适的时间间隔。
使用合适的转换操作:Spark Streaming 提供了丰富的转换操作,如 map、flatMap、filter、reduceByKey 等,可以实现对数据流的转换和处理。在实验中,需要根据具体业务逻辑和需求选择合适的转换操作,并合理组合这些操作,以获取期望的结果。
考虑容错性和数据丢失:Spark Streaming 具备很好的容错性,可以通过记录数据流的偏移量来保证数据不会丢失。在实验中,需要注意配置合适的容错机制,确保数据处理过程中的异常情况能够被恢复,并尽量避免数据丢失。
优化性能和资源利用:对于大规模的实时数据处理任务,性能和资源利用是非常重要的。在实验中,可以通过调整并行度、合理设置缓存策略、使用广播变量等手段来提高性能和资源利用效率。
总的来说,Spark Streaming 是一个功能强大且易用的流式计算框架,通过合理使用其提供的特性和操作,可以实现各种实时数据处理需求。在实验中,需要深入理解其原理和机制,并根据具体需求进行合理配置和优化,以获得良好的性能和结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。