当前位置:   article > 正文

pyspark之Structured Streaming文件file案例_pyspark操作structured streaming文件流

pyspark操作structured streaming文件流
  1. # generate_file.py
  2. # 生成数据 生成500个文件,每个文件1000条数据
  3. # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
  4. import os
  5. import time
  6. import shutil
  7. import time
  8. FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
  9. SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
  10. PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
  11. ACTION = ['login', 'logout', 'purchase']
  12. PATH = "/opt/software/tmp/"
  13. DATA_PATH = "/opt/software/tmp/data/"
  14. # 初始化环境
  15. def test_Setup():
  16. if os.path.exists(DATA_PATH):
  17. shutil.rmtree(DATA_PATH)
  18. os.mkdir(DATA_PATH)
  19. # 清理数据,恢复测试环境
  20. def test_TearDown():
  21. shutile.rmtree(DATA_PATH)
  22. # 数据保存文件
  23. def writeAndMove(filename,content):
  24. with open(PATH+filename,'wt',encoding='utf-8') as f:
  25. f.write(content)
  26. shutil.move(PATH+filename,DATA_PATH+filename)
  27. if __name__ == '__main__':
  28. test_Setup()
  29. for i in range(500):
  30. filename = "user_action_{}.log".format(i)
  31. """
  32. 验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
  33. """
  34. if i == 0:
  35. province= ['TaiWan']
  36. else:
  37. province = PROVINCE
  38. content = ""
  39. for _ in range(1000):
  40. content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
  41. writeAndMove(filename,content)
  42. time.sleep(10)
  43. # spark_file_test.py
  44. # 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对 outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段
  45. from pyspark.sql import SparkSession,DataFrame
  46. from pyspark.sql.functions import split,lit,from_unixtime
  47. DATA_PATH = "/opt/software/tmp/data/"
  48. if __name__ == '__main__':
  49. spark = SparkSession.builder.getOrCreate()
  50. lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
  51. # 分隔符为空格
  52. userinfo = lines.select(split(lines.value," ").alias("info"))
  53. # 第一个为eventtime 第二个为name 第三个为province 第四个为action
  54. # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
  55. user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
  56. userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
  57. userinfo['info'][3].alias('action'))
  58. """
  59. 测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
  60. user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
  61. """
  62. """
  63. 测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
  64. def insert_into_mysql_batch(df:DataFrame,batch):
  65. if df.count()>0:
  66. # 此处将batch添加到df中,采用lit函数
  67. data = df.withColumn("batch",lit(batch))
  68. data.write.format("jdbc"). \
  69. option("driver","com.mysql.jdbc.Driver"). \
  70. option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
  71. option("password","root").option("dbtable","user_log").\
  72. option("batchsize",1000).mode("append").save()
  73. else:
  74. pass
  75. user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
  76. """
  77. """
  78. 测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
  79. userProvinceCounts = user.groupBy("province").count()
  80. userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
  81. # 测试输出模式complete:complete将总计算结果都进行输出
  82. """
  83. batch 0
  84. TaiWan 1000
  85. batch 1
  86. TaiWan 1000
  87. 其他省份 sl
  88. batch 2
  89. TaiWan 1000
  90. 其他省份 sl
  91. """ userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
  92. # 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
  93. batch 0
  94. TaiWan 1000
  95. batch 1 中没有TaiWan输出
  96. userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
  97. """

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/709920
推荐阅读
相关标签
  

闽ICP备14008679号