当前位置:   article > 正文

PySpark : Structured Streaming_pyspark structured streaming读取list数据源

pyspark structured streaming读取list数据源

Spark Structured Streaming

    1. main difference between streaming and batch data
    1. details on the Structured Streaming API
    1. use for Structured Streaming on imcoming data and save output result in memory

Batch vs. Stream

  • batch data : 一段时间内收集的数据
  • stream processing : 实时或接近实时的数据处理,更高效,(fraud detection)

Streaming framework

  • Data input
  • Data processing(real time or near real time)
  • Final output
Data Input
  • ingest and process data continuously
    • Messaging systems : Apache Kafka, Flume, etc
    • File folders/directory : 连续的读目录中的文件作为流数据
Data Processing
  • aggregations
  • filtering
  • joins
  • sorting
  • etc
Final Output
  • Append or Complete
    • Append mode : adding only new results to the final output
    • Complete mode : updates the entire results table at the final output
    • File directory sink
    • Console
    • Memory sink

Building a Structured App

  • File folders/directory (基于文件目录的流数据处理应用)
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('structured_streaming').getOrCreate()

import pyspark.sql.functions as F
from pyspark.sql.types import *
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

step 1. Create dataset

(contains four columns, csv format or parquet format)

    1. User ID
    1. APP
    1. Time spend(secs)
    1. Age
# create a data file , in the local folder (csv_folder) # hdfs: dfs -mkdir /csv_folder
df_1=spark.createDataFrame([("XN203",'FB',300,30),
                                                ("XN201",'Twitter',10,19),
                                                ("XN202",'Insta',500,45)],
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')

# define schema
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs", "integer").add("age","integer")

# readStream
data=spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述

step 2. Operations

# count app
app_count=data.groupBy('app').count()

# write result in memory
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())

# usd SQL command to view the output
# toPandas() , require install pandas
# spark.sql("select * from count_query ").toPandas().head(5)
spark.sql('select * from count_query').show(5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在这里插入图片描述

####### filter FB app and calcualte avgrage time for each user
fb_data=data.filter(data['app']=='FB')
# 统计每个user,在FB上花费的平均时间
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())
spark.sql("select * from fb_query ").show()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

# add new data
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

# add new data
df_2=spark.createDataFrame([("XN203",'FB',1000,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],
                           ["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
统计每个app上user花费的时间总和并排序

app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())
spark.sql("select * from app_wise_query ").show()
  • 1
  • 2
  • 3

Structured Streaming Alternatives

    1. Flink
    1. Google’s Beam
    1. Spark SQL API
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/709918
推荐阅读
相关标签
  

闽ICP备14008679号