赞
踩
结构化流处理是 Spark 中用于处理实时流数据的 API。它提供了类似于对静态数据集进行操作的高级抽象,允许你以类似的方式处理实时数据流。在 PySpark 中,你可以使用结构化流处理来处理实时数据,并且可以使用类似于对静态 DataFrame 进行操作的方式来处理实时流数据
from pyspark.sql import SparkSession # 创建ss对象 ss = SparkSession.builder.getOrCreate() # 读取文件中的流数据 # Option 'basePath' must be a directory print("====================text==================") df_text = ss.readStream.text(path='/data_text') print("====================json==================") # 和离线计算读取不一样的地方: 需要添加schema信息, 不会自动解析 df_json = ss.readStream.load(path='/data_json', format='json', schema='id int,name string,age int,gender string') print("====================csv==================") # 设置读取流数据参数, 每次流式计算, 每个文件是一个批次 # 目前目录下有四个csv文件, 分为四个batch执行 options = {'maxFilesPerTrigger': 1} df_csv = ss.readStream.format('csv').option('sep', ';').schema('id int,name string,age int,gender string').load( path='/data_csv', **options) # 结果输出在控制台 df_csv.writeStream.start(format='console', outputMode='append').awaitTermination()
append 适合没有聚合操作的计算结果输出,将数据输出到文件时需要用append
complete 适合进行聚合操作,并且显示所有的数据计算结果
update 适合进行聚合操作,只展示新增数据的结果
df_res.writeStream.start(format='console', outputMode='complete').awaitTermination()
df_res.writeStream.start(format='console', outputMode='update').awaitTermination()
File Sink 把结果输出到文件中,仅支持追加 append模式
Kafka Sink 把结果输出到kafka的topic中,append complete update都支持
Console Sink 直接在终端中显示,append complete update都支持
foreach: 将流数据中的每行数据经过自定函数进行处理,自定义函数需要接受一个df中的row对象,append complete update都支持
foreachbatch: 将流数据中的每一小批数据经过自定义函数进行处理,排序操作,将聚合操作的结果保存到文件中,保存到mysql,自定义函数需要接受两个参数,df对象和batch_id,append complete update都支持
# 函数
def foreach_batch_function(df,df_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
触发器,决定多久执行一次,在流式处理中,等一会(等多久)就是由触发器决定
# 指定触发器trigger
# processingTime 指定固定时间间隔 5 seconds 5秒 5秒后会请求数据,计算超过5秒 计算结束后请求 没有数据阻塞等待新数据
# df.writeStream.trigger(processingTime='5 seconds').start(format='console', outputMode='append').awaitTermination()
# 指定触发器trigger
# once=True请求一次,请求后就结束程序
# df.writeStream.trigger(once=True).start(format='console', outputMode='append').awaitTermination()
# 指定触发器trigger
# continuous 指定固定时间间隔 5 seconds 5秒 达到5秒就会请求数据,没有数据则返回空
df.writeStream.trigger(continuous='5 seconds').start(format='console', outputMode='append').awaitTermination()
当获取到数据进行计算时,有可能数据在计算时会计算失败。此时spark会重新进行计算,就需要知道要计算哪个数据,就要借助checkpoint检查点机制,将当前计算的信息保存起来,方便重新进行计算。
保证spark处理数据的容错性。
# checkpoint 流式计算的容错机制 from pyspark.sql import SparkSession, functions as F ss = SparkSession.builder.getOrCreate() # 1-流式读取数据 df = ss.readStream.csv(path='/data_csv', schema='id int,name string,age int,gender string') # 2- 数据计算 df_group = df.groupby('gender').agg(F.avg('age').alias('avg_age')) """ 在计算时会有多个计算步骤,如果某个计算步骤在计算时出错,整个计算任务就失败了,为了避免计算失败,可以指定checkpoint,会将计算过程中的所有信息记录下来,当计算失败时,可以根据记录的信息重新计算 """ option = { 'checkpointLocation': 'hdfs://node1:8020/pydata_checkpoint' } def to_json(df, batch_id): df.write.json(path='/checkpoint_json', mode='append') # 3-输出结果 df_group.writeStream.foreachBatch(to_json).start(outputMode='complete', **option).awaitTermination()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。