赞
踩
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc=StreamingContext(sc,10)
>>> lines=ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
>>> words=lines.flatMap(lambda line:line.split(' '))
>>> wordCounts=words.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
>>> wordCounts.pprint()
>>> scc.start()
1.使用套接字流作为数据源
nc -lk 9999
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__=="__main__": if len(sys.argv)!=3: print("Usage:socket.py <hostname> <port>",file=sys.stderr) exit(-1) sc=SparkContext(appName="Pythonsocket") ssc=StreamingContext(sc,1) #用于创建一个套接字流类型的输入源 lines=ssc.socketTextStream(sys.argv[1],int(sys.argv[2])) #sys.argv[1]提供了主机地址,sys.argv[2]提供了通信端口号 counts=lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b) counts.pprint() ssc.start() ssc.awaitTermination()
2.使用socket编程实现自定义数据源
#创建一个Socket服务器端,发送信息给客户端(上面的socket.py) import socket server=socket.socket() #生成socket对象 server.bind('localhost',9999) #绑定IP和端口 server.listen(1) #监听绑定的端口 while 1: print("I am waitting the connect...") conn,addr=server.accept() print("Connect success! Connection is from %s"%addr[0]) print("Sending data...") conn.send("I love Spark I love Hadoop I like Spark".encode()) conn.close() print("Connection is broken")
RDD队列流
#Spark Streaming会每隔两秒从rddQueue这个队列中取出数据,每隔一秒创建一个RDD放入队列中 import time from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__=="__main__": sc=SparkContext(appName="PythonQueueStream") ssc=StreamingContext(sc,2) #创建一个队列,通过该队列可以把RDD推给一个RDD队列流 rddQueue=[] for i in range(5): rddQueue+=[ssc.sparkContext.parallelize([j for j in range(1,1001)],10)] time.sleep(1) #创建一个RDD队列流 inputStream=ssc.queueStream(rddQueue) mappedStream=inputStream.map(lambda x:(x%10,1)) #如取出RDD78,就会被转换成一个元祖(8,1),求余 reducedStream=mappedStream.reduceByKey(lambda a,b:a+b) #负责统计每个余数的出现次数 reducedStream.pprint() ssc.start() ssc.stop(stopSparkContext=True,stopGraceFully=True)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。