赞
踩
- # window函数滑动类型主要可能使用场景如:流量控制,资源保护等,可以明确获知某个时间段内具体数量,了解具体流量高峰数据,如果参数windowDuration和slideDuration一致,则即滚动类型,如果windowDuration大于slideDuration,则存在重复数据计算
-
- # 数据依旧采用前面文章中的generate_log生成数据
-
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import split,lit,window
-
- DATA = "/opt/software/tmp/data"
- if __name__ == '__main__':
-
- spark = SparkSession.buider.getOrCreate()
-
- lines = spark.readStream.format("text").option("seq","\n").load(DATA)
-
- userinfo = lines.select(split(lines.value," ").alias("info"))
- # 第一个为eventtime 第二个为name 第三个为province 第四个为action
- # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
- user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
- userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
- userinfo['info'][3].alias('action'))
-
- windowDuration = "5 minutes"
- slideDuration = "1 minutes"
-
- provinceCounts = user.groupBy("province",window('eventtime',windowDuration=windowDuration,slideDuration=slideDuration)).count()
- provinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="30 seconds").start().awaitTermination()

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。