赞
踩
- # generate_file.py
- # 生成数据 生成500个文件,每个文件1000条数据
- # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
- import os
- import time
- import shutil
- import time
-
- FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
- SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
- PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
- ACTION = ['login', 'logout', 'purchase']
-
- PATH = "/opt/software/tmp/"
- DATA_PATH = "/opt/software/tmp/data/"
- # 初始化环境
- def test_Setup():
- if os.path.exists(DATA_PATH):
- shutil.rmtree(DATA_PATH)
- os.mkdir(DATA_PATH)
-
- # 清理数据,恢复测试环境
- def test_TearDown():
- shutile.rmtree(DATA_PATH)
-
- # 数据保存文件
- def writeAndMove(filename,content):
- with open(PATH+filename,'wt',encoding='utf-8') as f:
- f.write(content)
- shutil.move(PATH+filename,DATA_PATH+filename)
-
- if __name__ == '__main__':
-
- test_Setup()
-
- for i in range(500):
- filename = "user_action_{}.log".format(i)
- """
- 验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
- """
- if i == 0:
- province= ['TaiWan']
- else:
- province = PROVINCE
- content = ""
- for _ in range(1000):
- content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
- writeAndMove(filename,content)
- time.sleep(10)
-
-
-
- # spark_file_test.py
- # 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对 outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段
-
- from pyspark.sql import SparkSession,DataFrame
- from pyspark.sql.functions import split,lit,from_unixtime
-
- DATA_PATH = "/opt/software/tmp/data/"
-
- if __name__ == '__main__':
- spark = SparkSession.builder.getOrCreate()
- lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
- # 分隔符为空格
- userinfo = lines.select(split(lines.value," ").alias("info"))
- # 第一个为eventtime 第二个为name 第三个为province 第四个为action
- # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
- user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
- userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
- userinfo['info'][3].alias('action'))
- """
- 测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
- user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
- """
- """
- 测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
- def insert_into_mysql_batch(df:DataFrame,batch):
- if df.count()>0:
- # 此处将batch添加到df中,采用lit函数
- data = df.withColumn("batch",lit(batch))
- data.write.format("jdbc"). \
- option("driver","com.mysql.jdbc.Driver"). \
- option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
- option("password","root").option("dbtable","user_log").\
- option("batchsize",1000).mode("append").save()
- else:
- pass
- user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
- """
- """
- 测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
- userProvinceCounts = user.groupBy("province").count()
- userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
- # 测试输出模式complete:complete将总计算结果都进行输出
- """
- batch 0
- TaiWan 1000
- batch 1
- TaiWan 1000
- 其他省份 sl
- batch 2
- TaiWan 1000
- 其他省份 sl
- """ userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
- # 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
- batch 0
- TaiWan 1000
- batch 1 中没有TaiWan输出
- userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
- """
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。