当前位置:   article > 正文

RDD转化为DataFrames的两种创建方式_rdd转成dataframe

rdd转成dataframe

1.通过反射的方式

         Scala的接口为SparkSQL提供了RDD通过转换成样例类,然后自动的转换成DataFrame,样例类定义了表的模式,使用反射读取case类参数的名称,并成为列的名称。case类也可以嵌套,或者包含复杂类型,如序列或数组。RDD可以隐式转换为一个DataFrame,然后被注册为一个表。表可用于后续SQL语句中。

  1. // sc is an existing SparkContext
  2. .val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // this is used to implicitly convert an RDD to a DataFrame.
  4. import sqlContext.implicits._
  5. // Define the schema using a case class.
  6. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
  7. // you can use custom classes that implement the Product interface.
  8. case class Person(name: String, age: Int)
  9. // Create an RDD of Person objects and register it as a table
  10. .val people = sc.textFile( "examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
  11. people.registerTempTable("people")
  12. // SQL statements can be run by using the sql methods provided by sqlContext.
  13. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
  14. // The results of SQL queries are DataFrames and support all the normal RDD operations.
  15. // The columns of a row in the result can be accessed by field index:
  16. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
  17. // or by field name:
  18. teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
  19. // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
  20. teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
  21. // Map("name" -> "Justin", "age" -> 19)
 
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.sql.{DataFrame, SQLContext}
  4. case class Person(val name:String,val age:Integer)
  5. object RDD2DataFrameByReflection {
  6. def main(args: Array[String]): Unit = {
  7. val conf = new SparkConf().setAppName("sparkSQL").setMaster("local")
  8. val sc = new SparkContext(conf)
  9. val sqlContext = new SQLContext(sc)
  10. val fileRDD: RDD[String] = sc.textFile("examples/src/main/resources/people.txt")
  11. val personRDD: RDD[Person] = fileRDD.map(_.split(",")).map(p => Person(p(0).trim,p(1).trim.toInt))
  12. import sqlContext.implicits._
  13. val df: DataFrame = personRDD.toDF()
  14. df.registerTempTable("student")
  15. val result: DataFrame = sqlContext.sql("select *from student")
  16. result.show()
  17. result.map(t =>"name:"+t(0)).foreach(println)
  18. result.map(_.getAs[String]("name")).foreach(println)
  19. result.map(_.getValuesMap[Any](List("name","age"))).foreach(println)
  20. }
  21. }


2.通过将RDD转化成Row的方式

       当类不能被提前定义的时候(例如:结构化的记录被加密成是一个字符串,或者文本数据集将被解析给不同的用户使用)一个DataFrame的创建呀经历三个步骤:

     (1)从原始的RDD创建一个数据类型是Row的RDD

     (2)创建一个schema的结构和之前创建的Row RDD要匹配

      (3)将row RDD转换成DataFrame

  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // Create an RDD
  4. val people = sc.textFile("examples/src/main/resources/people.txt")
  5. // The schema is encoded in a string
  6. val schemaString = "name age"
  7. // Import Row.i
  8. mport org.apache.spark.sql.Row;
  9. // Import Spark SQL data types
  10. import org.apache.spark.sql.types.{StructType,StructField,StringType};
  11. // Generate the schema based on the string of schema
  12. val schema =
  13. StructType(
  14. schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  15. // Convert records of the RDD (people) to Rows.
  16. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
  17. // Apply the schema to the RDD.
  18. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
  19. // Register the DataFrames as a table
  20. .peopleDataFrame.registerTempTable("people")
  21. // SQL statements can be run by using the sql methods provided by sqlContext.
  22. val results = sqlContext.sql("SELECT name FROM people")
  23. // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by field index or by field name.
  24. results.map(t => "Name: " + t(0)).collect().foreach(println)


  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.types.{IntegerType, StructField, StructType,StringType}
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.apache.spark.sql.{DataFrame, Row, SQLContext}
  5. object RDD2DataFrameByProgrammatically {
  6. def main(args: Array[String]): Unit = {
  7. val conf = new SparkConf().setAppName("sparkSQL").setMaster("local")
  8. val sc = new SparkContext(conf)
  9. val sqlContext = new SQLContext(sc)
  10. val fileRDD: RDD[String] = sc.textFile("examples/src/main/resources/people.txt")
  11. val rowRDD: RDD[Row] = fileRDD.map(_.split(",")).map(p => Row(p(0).trim,p(1).trim.toInt))
  12. val schema = StructType(
  13. Array(
  14. StructField("name",StringType,true) ,
  15. StructField("age",IntegerType,true)
  16. )
  17. )
  18. val df: DataFrame = sqlContext.createDataFrame(rowRDD,schema)
  19. df.registerTempTable("student")
  20. sqlContext.sql("select *from student").show()
  21. }
  22. }



 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/693315
推荐阅读
相关标签
  

闽ICP备14008679号