当前位置:   article > 正文

Pyspark读写csv,txt,json,xlsx,xml,avro等文件_pyspark txt

pyspark txt

1. Spark读写txt文件

读:

  1. df = spark.read.text("/home/test/testTxt.txt").show()
  2. +-------------+
  3. | value|
  4. +-------------+
  5. | a,b,c,d|
  6. |123,345,789,5|
  7. |34,45,90,9878|
  8. +-------------+

2. Spark读写csv文件

读:

  1. # 文件在hdfs上的位置
  2. file_path = r"/user/lanyue/data.csv"
  3. # 方法一
  4. # 推荐这种,指定什么文件格式都可以,只需要修改参数format即可
  5. # 不同的格式其load函数会有不同,用的时候请自行搜索。
  6. df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',')
  7. # sep=',',表示指定分隔符为逗号,同参数delimiter。
  8. # header=TRUE,表示数据的第一行为列名
  9. # inferSchema,表示是否对字段类型进行推测。=False,默认读取后都按照文本字符处理。=True表示自动推断schema。
  10. # 或者下面这种形式。这两种形式都可以
  11. df = spark.read.format("csv").option("encoding","utf-8").option("header",True).load(file_path, schema=schema) # 使用指定的schema
  12. # 方法二
  13. df = spark.read.csv(file_path, encoding='utf-8', header=True, inferSchema=True)
  14. df = spark.read.csv(file_path, encoding='utf-8', header=True, schema=schema)
  15. # 如果想指定文件格式是json,那就是spark.read.json,其他类似

写:

  1. # 保存在【hdfs上】,以csv文件的格式。指定什么文件格式都可以,只需要修改参数format即可
  2. df.repartition(1).write.mode('append').format("csv").option("encoding","utf-8").option("header",True).save("/lanyue/data.csv")
  3. # mode,保存模式:ovewriter重写、append文件末尾追加、error如果文件存在抛出异常、ignore如果文件存在忽略不更新
  4. # repartition, 在yarn模式下,Spark会根据hdfs文件的块数据大小来划分默认的分区数目,但是我们也可以自己设置分区数目,使用参数repartition。=1表示只保存成一个数据块
  5. # 或者
  6. df.write.csv("/lanyue/data.csv", sep="\t", encoding="utf-8", mode='overwrite')
  7. # 如果想指定文件格式是json,那就是df.write.json,其他类似
  8. # 通过指定参数sep,来指定分隔符,可以是",", "\t","\x01"等。同参数delimiter。

3. Spark读写parquet文件

读:

  1. file = "/user/muzili/data.parquet"
  2. spark_df=spark.read.parquet(file)
  3. df.show()

写:

spark_df.write.parquet(path=file,mode='overwrite')

4. Spark读写json文件

读:

  1. file = "/user/muzili/data.json"
  2. df = spark.read.json(file)
  3. df.show()

写:

df.repartition(1).write.mode('append').format("json").option("encoding","utf-8").option("header",True).save("/user/muzili/data.json")

5. Spark读写excel文件

读:

写:

6. Spark读写xml文件

读:

写:

7. Spark读写orc文件

读:

写:

8. Spark读写avro文件

读:

写:

9. Spark读写mysql中的表

读:

  1. url="jdbc:mysql://host:port/database"
  2. table="table_name"
  3. driver="com.mysql.jdbc.Driver"
  4. user="XXX"
  5. password="XXX"
  6. df = spark.read.format("jdbc")
  7. .option("url",url) # database地址,格式为jdbc:mysql://主机:端口/数据库
  8. .option("dbtable",table) # 表名
  9. .option("user",user)
  10. .option("password",password)
  11. .option("driver",driver)
  12. .load()
  13. # 或者以下形式
  14. df = spark.read.format('jdbc').options(url="jdbc:mysql://host:port/database", # database地址
  15. driver="com.mysql.jdbc.Driver",
  16. dbtable="table_name",
  17. user="XXX",
  18. password="XXX").load()
  19. # 或者以下形式
  20. # mysql的相关配置
  21. prop = {'user': 'xxx',
  22. 'password': 'xxx',
  23. 'driver': 'com.mysql.jdbc.Driver'}
  24. url = 'jdbc:mysql://host:port/database' # database地址
  25. df = spark.read.jdbc(url=url, table='mysql_table_name', properties=prop)

写:

  1. # 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行
  2. prop = {'user': 'xxx',
  3. 'password': 'xxx',
  4. 'driver': 'com.mysql.jdbc.Driver'}
  5. url = 'jdbc:mysql://host:port/database' # database地址
  6. df.write.jdbc(url=url, table='table_name', mode='append', properties=prop)
  7. # append 追加方式
  8. # 或者以下形式
  9. df.write.format("jdbc")
  10. .option("url","jdbc:mysql://host:port/database") # database地址
  11. .option("dbtable","table_name")
  12. .option("user",user)
  13. .option("password",password)
  14. .option("driver",driver)
  15. .option("batchsize","1000").mode("overwrite") # overwrite 清空表再导入
  16. .save()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/609210
推荐阅读
相关标签
  

闽ICP备14008679号