当前位置:   article > 正文

pyspark之Structured Streaming结果保存到Mysql数据库-socket例子统计(含批次)_pyspark将数据保存到mysql

pyspark将数据保存到mysql
  1. from pyspark.sql import SparkSession,DataFrame
  2. from pyspark.sql.functions import explode,split,lit
  3. """
  4. 实现将数据保存到mysql数据库,同时将流计算batch保存到数据库中
  5. """
  6. if __name__ == '__main__':
  7. spark = SparkSession.builder.getOrCreate()
  8. spark.sparkContext.setLogLevel("WARN")
  9. #配置socket数据源 配置host和port
  10. lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
  11. words = lines.select(explode(split(lines.value," ")).alias("word"))
  12. wordsCount = words.groupBy("word").count()
  13. """
  14. 输出到console
  15. wordsCount.writeStream.outputMode("complete").format("console").trigger(processingTime="10 seconds").start().awaitTermination()
  16. """
  17. #mysql表结构如下: word varchar(32) sl int batch int
  18. PROP = {}
  19. PROP['driver'] = 'com.mysql.jdbc.Driver'
  20. PROP['user'] = 'root'
  21. PROP['password'] = 'root'
  22. def insert_into_mysql(df:DataFrame,batch):
  23. print("batch:{} is start".format(batch))
  24. data = df.withColumn("batch",lit(batch)).withColumnRenamed("count","sl")
  25. """
  26. 两种写法--->个人感觉第二种配置了batchsize速度块些,可能是数据库配置原因,测试过程中流数据一条一条submit到数据库,速度较慢
  27. data.write.mysql(url="jdbc:mysql://localhost:3306/spark",table="socket_test",mode="append",properties=PROP)
  28. """
  29. data.write.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","socket").option("user","root").option("password","root").option("batchsize",100).mode("append").save()
  30. print("batch:{} is end".format(batch))
  31. wordsCount.writeStream.outputMode("complete").foreachBatch(insert_into_mysql).trigger(processingTime="20 seconds").start().awaitTermination()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/709903
推荐阅读
相关标签
  

闽ICP备14008679号