当前位置:   article > 正文

Spark_SQL-DataFrame数据写出以及读写数据库(以MySQl为例)_pyspark dataframe写mysql

pyspark dataframe写mysql

                  一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

二、写出MySQL数据库


一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

        统一API写法:

       常见源写出:

  1. # cording:utf8
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, IntegerType, StringType
  4. import pyspark.sql.functions as F
  5. if __name__ == '__main__':
  6. spark = SparkSession.builder.\
  7. appName('write').\
  8. master('local[*]').\
  9. getOrCreate()
  10. sc = spark.sparkContext
  11. # 1.读取文件
  12. schema = StructType().add('user_id', StringType(), nullable=True).\
  13. add('movie_id', IntegerType(), nullable=True).\
  14. add('rank', IntegerType(), nullable=True).\
  15. add('ts', StringType(), nullable=True)
  16. df = spark.read.format('csv').\
  17. option('sep', '\t').\
  18. option('header', False).\
  19. option('encoding', 'utf-8').\
  20. schema(schema=schema).\
  21. load('../input/u.data')
  22. # write text 写出,只能写出一个列的数据,需要将df转换为单列df
  23. df.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\
  24. write.\
  25. mode('overwrite').\
  26. format('text').\
  27. save('../output/sql/text')
  28. # write csv
  29. df.write.mode('overwrite').\
  30. format('csv').\
  31. option('sep',';').\
  32. option('header', True).\
  33. save('../output/sql/csv')
  34. # write json
  35. df.write.mode('overwrite').\
  36. format('json').\
  37. save('../output/sql/json')
  38. # write parquet
  39. df.write.mode('overwrite').\
  40. format('parquet').\
  41. save('../output/sql/parquet')

二、写出MySQL数据库

        API写法:

        注意:

        ①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)

        ②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码

        ③save()不要填参数,没有路径,是写出数据库

        ④dbtable属性:指定写出的表名

  1. # cording:utf8
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, IntegerType, StringType
  4. import pyspark.sql.functions as F
  5. if __name__ == '__main__':
  6. spark = SparkSession.builder.\
  7. appName('write').\
  8. master('local[*]').\
  9. getOrCreate()
  10. sc = spark.sparkContext
  11. # 1.读取文件
  12. schema = StructType().add('user_id', StringType(), nullable=True).\
  13. add('movie_id', IntegerType(), nullable=True).\
  14. add('rank', IntegerType(), nullable=True).\
  15. add('ts', StringType(), nullable=True)
  16. df = spark.read.format('csv').\
  17. option('sep', '\t').\
  18. option('header', False).\
  19. option('encoding', 'utf-8').\
  20. schema(schema=schema).\
  21. load('../input/u.data')
  22. # 2.写出df到MySQL数据库
  23. df.write.mode('overwrite').\
  24. format('jdbc').\
  25. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\
  26. option('dbtable', 'movie_data').\
  27. option('user', 'root').\
  28. option('password', '123456').\
  29. save()
  30. # 读取
  31. df2 = spark.read.format('jdbc'). \
  32. option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \
  33. option('dbtable', 'movie_data'). \
  34. option('user', 'root'). \
  35. option('password', '123456'). \
  36. load()
  37. # 查看读取结果
  38. df2.printSchema()
  39. df2.show()
  40. '''
  41. JDBC写出,会自动创建表的
  42. 因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空
  43. '''

        保存结果:

        读取结果:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/558710
推荐阅读
相关标签
  

闽ICP备14008679号