当前位置:   article > 正文

Spark创建DataFrame的三种方法_spark.createdataframe

spark.createdataframe

关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建,下面简要介绍创建DataFrame的三种方法。

方法一,Spark中使用toDF函数创建DataFrame

通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。

本地seq + toDF创建DataFrame示例:

  1. import sqlContext.implicits._
  2. val df = Seq(
  3. (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  4. (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
  5. ).toDF("int_column", "string_column", "date_column")

注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2", ...

通过case class + toDF创建DataFrame的示例

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // this is used to implicitly convert an RDD to a DataFrame.
  4. import sqlContext.implicits._
  5. // Define the schema using a case class.
  6. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  7. // you can use custom classes that implement the Product interface.
  8. case class Person(name: String, age: Int)
  9. // Create an RDD of Person objects and register it as a table.
  10. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
  11. people.registerTempTable("people")
  12. // 使用 sqlContext 执行 sql 语句.
  13. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  14. // 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
  15. // The columns of a row in the result can be accessed by ordinal.
  16. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

方法二,Spark中使用createDataFrame函数创建DataFrame

SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。

通过row+schema创建示例

  1. import org.apache.spark.sql.types._
  2. val schema = StructType(List(
  3. StructField("integer_column", IntegerType, nullable = false),
  4. StructField("string_column", StringType, nullable = true),
  5. StructField("date_column", DateType, nullable = true)
  6. ))
  7. val rdd = sc.parallelize(Seq(
  8. Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  9. Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
  10. ))
  11. val df = sqlContext.createDataFrame(rdd, schema)

方法三,通过文件直接创建DataFrame

使用parquet文件创建

val df = sqlContext.read.parquet("hdfs:/path/to/file")

使用json文件创建

  1. val df = spark.read.json("examples/src/main/resources/people.json")
  2. // Displays the content of the DataFrame to stdout
  3. df.show()
  4. // +----+-------+
  5. // | age| name|
  6. // +----+-------+
  7. // |null|Michael|
  8. // | 30| Andy|
  9. // | 19| Justin|
  10. // +----+-------+

使用csv文件,spark2.0+之后的版本可用

  1. //首先初始化一个SparkSession对象
  2. val spark = org.apache.spark.sql.SparkSession.builder
  3. .master("local")
  4. .appName("Spark CSV Reader")
  5. .getOrCreate;
  6. //然后使用SparkSessions对象加载CSV成为DataFrame
  7. val df = spark.read
  8. .format("com.databricks.spark.csv")
  9. .option("header", "true") //reading the headers
  10. .option("mode", "DROPMALFORMED")
  11. .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api
  12. df.show()

补充:spark数据集的演变:
spark_dataframe

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/693329
推荐阅读
相关标签
  

闽ICP备14008679号