赞
踩
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('structured_streaming').getOrCreate()
import pyspark.sql.functions as F
from pyspark.sql.types import *
(contains four columns, csv format or parquet format)
# create a data file , in the local folder (csv_folder) # hdfs: dfs -mkdir /csv_folder
df_1=spark.createDataFrame([("XN203",'FB',300,30),
("XN201",'Twitter',10,19),
("XN202",'Insta',500,45)],
["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# define schema
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs", "integer").add("age","integer")
# readStream
data=spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")
# count app
app_count=data.groupBy('app').count()
# write result in memory
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())
# usd SQL command to view the output
# toPandas() , require install pandas
# spark.sql("select * from count_query ").toPandas().head(5)
spark.sql('select * from count_query').show(5)
####### filter FB app and calcualte avgrage time for each user
fb_data=data.filter(data['app']=='FB')
# 统计每个user,在FB上花费的平均时间
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())
spark.sql("select * from fb_query ").show()
# add new data
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],
["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
# add new data
df_2=spark.createDataFrame([("XN203",'FB',1000,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],
["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
统计每个app上user花费的时间总和并排序
app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())
spark.sql("select * from app_wise_query ").show()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。