当前位置:   article > 正文

pySpark创建DataFrame的方式_pyspark 创建视图

pyspark 创建视图

                              pySpark创建DataFrame的方式

有时候需要在迭代的过程中将多个dataframe进行合并(union),这时候需要一个空的初始dataframe。创建空dataframe可以通过spark.createDataFrame()方法来创建:

  1. # 先定义dataframe各列的数据类型
  2. from pyspark.sql.types import *
  3. schema = StructType([
  4. StructField("a", IntegerType(), True),
  5. StructField("b", IntegerType(), True),
  6. StructField("c", IntegerType(), True)])
  7. # 通过定义好的dataframe的schema来创建空dataframe
  8. df1 = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
  9. df2 = sc.parallelize([(4,5,6)]).toDF(['a','b','c'])
  10. df1.union(df2).show()
  11. +---+---+---+
  12. | a| b| c|
  13. +---+---+---+
  14. | 4| 5| 6|
  15. +---+---+---+

通过上面的方法可以创建指定列名和数据类型的dataframe。但是有时候我们需要创建的dataframe的数据结构是跟某个dataframe结构是相同的,而这个结构有非常复杂,难以直接创建,这时候就可以直接使用已有的dataframe的schema来创建新的dataframe了:

  1. df3 = spark.createDataFrame(spark.sparkContext.emptyRDD(), df2.schema)
  2. df3.union(df2).show()
  3. +---+---+---+
  4. | a| b| c|
  5. +---+---+---+
  6. | 4| 5| 6|
  7. +---+---+---+

对于Spark 2.0来说,所有的功能都可以以类SparkSession类作为切入点。要创建SparkSession,只需要使用SparkSession.builder()

使用Spark Session,应用程序可以从现有的RDD,Hive表或Spark数据源创建DataFrame,Spark SQL可以使用DataFrame接口在各种数据源上运行。使用Spark SQL DataFrame,我们可以创建一个临时视图。在DataFrame的临时视图中,可以对数据运行SQL查询。

Spark SQL DataFrame API没有提供编译时类型安全性。因此,如果结构未知,就无法操纵数据,一旦我们将域对象转换为数据帧,就不可能重新生成域对象; Spark SQL中的DataFrame API提高了Spark的性能和可伸缩性。它避免了为数据集中的每一行构造单个对象的垃圾收集成本。

  1. from pyspark import SparkContext
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, StructField, LongType, StringType
  4. from pyspark.sql import Row
  5. from pyspark.sql import Column
  6. import pandas as pd
  7. import numpy as np
  8. # 创建SparkSession连接到Spark集群-SparkSession.builder.appName('name').getOrCreate()
  9. spark=SparkSession \
  10. .builder \
  11. .appName('my_app_name') \
  12. .getOrCreate()
  13. # 创建DataFrame,可以从不同的数据创建,以下进行对个数据源读取创建说明
  14. def create_json_file():
  15. df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).applymap(lambda x: int(x*10))
  16. file=r"random.csv"
  17. df.to_csv(file,index=False)
  18. def create_df_from_rdd():
  19. # 从集合中创建新的RDD
  20. stringCSVRDD = spark.sparkContext.parallelize([
  21. (123, "Katie", 19, "brown"),
  22. (456, "Michael", 22, "green"),
  23. (789, "Simone", 23, "blue")])
  24. # 设置dataFrame将要使用的数据模型,定义列名,类型和是否为能为空
  25. schema = StructType([StructField("id", LongType(), True),
  26. StructField("name", StringType(), True),
  27. StructField("age", LongType(), True),
  28. StructField("eyeColor", StringType(), True)])
  29. # 创建DataFrame
  30. swimmers = spark.createDataFrame(stringCSVRDD,schema)
  31. # 注册为临时表
  32. swimmers.registerTempTable("swimmers")
  33. # 使用Sql语句
  34. data=spark.sql("select * from swimmers")
  35. # 将数据转换List,这样就可以查看dataframe的数据元素的样式
  36. print(data.collect())
  37. # 以表格形式展示数据
  38. data.show()
  39. print("{}{}".format("swimmer numbers : ",swimmers.count()) )
  40. def create_df_from_json():
  41. '''
  42. read的类型是DataFrameReader
  43. '''
  44. df = spark.read.json('pandainfo.json')
  45. df.show()
  46. def create_df_from_csv():
  47. df=spark.read.csv('random.csv',header=True, inferSchema=True)
  48. df.show()
  49. def create_df_from_postgres():
  50. """
  51. format : 指定数据源格式 - 如 jdbc , json , csv等
  52. options: 为数据源添加相关特性选项
  53. """
  54. df=spark.read.format('jdbc').options(
  55. url='jdbc:postgresql://localhost:5432/northwind',
  56. dbtable='public.orders',
  57. user='postgres',
  58. password='iamroot'
  59. ).load()
  60. df.show()
  61. def create_df_from_mysql():
  62. """
  63. """
  64. df=spark.read.format('jdbc').options(
  65. url='jdbc:mysql://localhost:3306',
  66. dbtable='mysql.db',
  67. user='root',
  68. password='iamneo'
  69. ).load()
  70. df.show()
  71. def create_df_from_pandas():
  72. """
  73. 从Python pandas获取数据
  74. """
  75. df = pd.DataFrame(np.random.random((4,4)))
  76. spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])
  77. spark_df.show()
  78. def create_df_from_hive(hive):
  79. # 创建支持Hive的Spark Session
  80. appName = "PySpark Hive Example"
  81. master = "local"
  82. spark = SparkSession.builder \
  83. .appName(appName) \
  84. .master(master) \
  85. .enableHiveSupport() \
  86. .getOrCreate()
  87. df = spark.sql("select * from test_db.test_table")
  88. df.show()
  89. # 将数据保存到Hive新表
  90. df.write.mode("overwrite").saveAsTable("test_db.test_table2")
  91. # 查看数据
  92. spark.sql("select * from test_db.test_table2").show()
  93. if __name__=='__main__':
  94. create_json_file()
  95. create_df_from_rdd()
  96. create_df_from_csv()
  97. create_df_from_json()
  98. create_df_from_db()
  99. create_df_from_mysql()
  100. create_df_from_pandas()

 

参考:
https://stackoverflow.com/questions/54503014/how-to-get-the-schema-definition-from-a-dataframe-in-pyspark

https://www.jianshu.com/p/f79838ddb534

https://blog.csdn.net/sinat_26811377/article/details/101217071



 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/632433
推荐阅读
相关标签
  

闽ICP备14008679号