赞
踩
1.通过反射的方式
Scala的接口为SparkSQL提供了RDD通过转换成样例类,然后自动的转换成DataFrame,样例类定义了表的模式,使用反射读取case类参数的名称,并成为列的名称。case类也可以嵌套,或者包含复杂类型,如序列或数组。RDD可以隐式转换为一个DataFrame,然后被注册为一个表。表可用于后续SQL语句中。
// sc is an existing SparkContext .val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table .val people = sc.textFile( "examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, SQLContext} case class Person(val name:String,val age:Integer) object RDD2DataFrameByReflection { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("sparkSQL").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD: RDD[String] = sc.textFile("examples/src/main/resources/people.txt") val personRDD: RDD[Person] = fileRDD.map(_.split(",")).map(p => Person(p(0).trim,p(1).trim.toInt)) import sqlContext.implicits._ val df: DataFrame = personRDD.toDF() df.registerTempTable("student") val result: DataFrame = sqlContext.sql("select *from student") result.show() result.map(t =>"name:"+t(0)).foreach(println) result.map(_.getAs[String]("name")).foreach(println) result.map(_.getValuesMap[Any](List("name","age"))).foreach(println) } }
2.通过将RDD转化成Row的方式
当类不能被提前定义的时候(例如:结构化的记录被加密成是一个字符串,或者文本数据集将被解析给不同的用户使用)一个DataFrame的创建呀经历三个步骤:
(1)从原始的RDD创建一个数据类型是Row的RDD
(2)创建一个schema的结构和之前创建的Row RDD要匹配
(3)将row RDD转换成DataFrame
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row.i mport org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table .peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations.// The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StructField, StructType,StringType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, Row, SQLContext} object RDD2DataFrameByProgrammatically { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("sparkSQL").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val fileRDD: RDD[String] = sc.textFile("examples/src/main/resources/people.txt") val rowRDD: RDD[Row] = fileRDD.map(_.split(",")).map(p => Row(p(0).trim,p(1).trim.toInt)) val schema = StructType( Array( StructField("name",StringType,true) , StructField("age",IntegerType,true) ) ) val df: DataFrame = sqlContext.createDataFrame(rowRDD,schema) df.registerTempTable("student") sqlContext.sql("select *from student").show() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。