赞
踩
- from pyspark.sql import SparkSession,DataFrame
- from pyspark.sql.functions import explode,split,lit
- """
- 实现将数据保存到mysql数据库,同时将流计算batch保存到数据库中
- """
- if __name__ == '__main__':
-
- spark = SparkSession.builder.getOrCreate()
- spark.sparkContext.setLogLevel("WARN")
-
- #配置socket数据源 配置host和port
- lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
-
- words = lines.select(explode(split(lines.value," ")).alias("word"))
-
- wordsCount = words.groupBy("word").count()
-
- """
- 输出到console
- wordsCount.writeStream.outputMode("complete").format("console").trigger(processingTime="10 seconds").start().awaitTermination()
- """
-
- #mysql表结构如下: word varchar(32) sl int batch int
- PROP = {}
- PROP['driver'] = 'com.mysql.jdbc.Driver'
- PROP['user'] = 'root'
- PROP['password'] = 'root'
- def insert_into_mysql(df:DataFrame,batch):
- print("batch:{} is start".format(batch))
- data = df.withColumn("batch",lit(batch)).withColumnRenamed("count","sl")
- """
- 两种写法--->个人感觉第二种配置了batchsize速度块些,可能是数据库配置原因,测试过程中流数据一条一条submit到数据库,速度较慢
- data.write.mysql(url="jdbc:mysql://localhost:3306/spark",table="socket_test",mode="append",properties=PROP)
- """
- data.write.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://localhost:3306/spark").option("dbtable","socket").option("user","root").option("password","root").option("batchsize",100).mode("append").save()
- print("batch:{} is end".format(batch))
-
- wordsCount.writeStream.outputMode("complete").foreachBatch(insert_into_mysql).trigger(processingTime="20 seconds").start().awaitTermination()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。