当前位置:   article > 正文

spark streaming(文件流,套接字流,RDD队列流)_使用文件流作为数据源

使用文件流作为数据源

文件流

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc=StreamingContext(sc,10)
>>> lines=ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')
>>> words=lines.flatMap(lambda line:line.split(' '))
>>> wordCounts=words.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
>>> wordCounts.pprint()
>>> scc.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

套接字

1.使用套接字流作为数据源
nc -lk 9999

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__=="__main__":
    if len(sys.argv)!=3:
        print("Usage:socket.py <hostname> <port>",file=sys.stderr)
        exit(-1)
    sc=SparkContext(appName="Pythonsocket")
    ssc=StreamingContext(sc,1)
    #用于创建一个套接字流类型的输入源
    lines=ssc.socketTextStream(sys.argv[1],int(sys.argv[2]))
    #sys.argv[1]提供了主机地址,sys.argv[2]提供了通信端口号
    counts=lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2.使用socket编程实现自定义数据源

#创建一个Socket服务器端,发送信息给客户端(上面的socket.py)
import socket
server=socket.socket()
#生成socket对象
server.bind('localhost',9999)
#绑定IP和端口
server.listen(1)
#监听绑定的端口
while 1:
    print("I am waitting the connect...")
    conn,addr=server.accept()
    print("Connect success! Connection is from %s"%addr[0])
    print("Sending data...")
    conn.send("I love Spark I love Hadoop I like Spark".encode())
    conn.close()
    print("Connection is broken")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

RDD队列流

#Spark Streaming会每隔两秒从rddQueue这个队列中取出数据,每隔一秒创建一个RDD放入队列中
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__=="__main__":
    sc=SparkContext(appName="PythonQueueStream")
    ssc=StreamingContext(sc,2)
    #创建一个队列,通过该队列可以把RDD推给一个RDD队列流
    rddQueue=[]
    for i in range(5):
        rddQueue+=[ssc.sparkContext.parallelize([j for j in range(1,1001)],10)]
        time.sleep(1)
    #创建一个RDD队列流
    inputStream=ssc.queueStream(rddQueue)
    mappedStream=inputStream.map(lambda x:(x%10,1))
    #如取出RDD78,就会被转换成一个元祖(8,1),求余
    reducedStream=mappedStream.reduceByKey(lambda a,b:a+b)
    #负责统计每个余数的出现次数
    reducedStream.pprint()
    ssc.start()
    ssc.stop(stopSparkContext=True,stopGraceFully=True)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/701112
推荐阅读
相关标签
  

闽ICP备14008679号