赞
踩
(1)微批处理
(2)持续处理
1.步骤1:导入pyspark模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。
2.步骤2:创建SparkSession对象
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
3.步骤3:创建输入数据源
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
4.步骤4:定义流计算过程
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
wordCounts = words.groupBy("word").count()
5.步骤5:启动流计算并输出结果
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="8 seconds") \
.start()
query.awaitTermination()
(1)创建程序生成JSON格式的File源测试数据
(2)创建程序对数据进行统计
(3)测试运行程序
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。