赞
踩
Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。
一、添加maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.3</version>
</dependency>
Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。这种RDD可以高效的转换为DataFrame并注册为表。
package com.fgm.sparksql
import org.apache.spark.sql.SparkSession //利用反射,将rdd转换成dataFrame case class Person(val id:Int,val name:String,val age:Int) object SchemaDemo { def main(args: Array[String]): Unit = { //创建SparkSession对象 val sparkSession = SparkSession.builder().appName("Schema").master("local[2]").getOrCreate() //创建SparkContext对象 val sc = sparkSession.sparkContext sc.setLogLevel("WARN") //读取数据文件 val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" ")) //将rdd与样例类关联 val personRDD = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //将personRDD转换成DataFrame,需导入隐式转换 import sparkSession.implicits._ val personDF = personRDD.toDF() //dataFrame操作 //DSL风格 personDF.printSchema() personDF.show() personDF.select("name","age").show() personDF.select($"age">30).show() //sql风格语法 personDF.createTempView("person") sparkSession.sql("select * from person").show() sparkSession.sql("select * from person where age>30").show() sparkSession.sql("select * from person where id=3").show() sparkSession.stop() } }
当case class不能提前定义好时,可以通过以下三步创建DataFrame
(1)将RDD转为包含Row对象的RDD
(2)基于StructType类型创建schema,与第一步创建的RDD相匹配
(3)通过sparkSession的createDataFrame方法对第一步的RDD应用schema创建DataFrame
package com.fgm.sparksql import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** *通过StructType指定schema,将rdd转换成dataFrame * @Auther: fgm */ object StructTypeSchema { def main(args: Array[String]): Unit = { //创建SparkSession对象 val spark = SparkSession.builder().appName("StructTypSchema").master("local[2]").getOrCreate() //创建SparkContext val sc = spark.sparkContext sc.setLogLevel("WARN") //读取数据 val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" ")) //将rdd与rowd对象关联 val rowRDD = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //指定schema val schema=(new StructType).add(StructField("id",IntegerType,true)) .add(StructField("name",StringType,false)) .add(StructField("age",IntegerType,true)) val dataFrame = spark.createDataFrame(rowRDD,schema) dataFrame.printSchema() dataFrame.show() dataFrame.createTempView("person") spark.sql("select * from person").show() spark.stop() } }
HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类,在Spark2.0之后,HiveContext和SqlContext在SparkSession进行了统一,可以通过操作SparkSession来操作HiveContext和SqlContext。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.3</version>
</dependency>
package com.fgm.sparksql import org.apache.spark.sql.SparkSession /** *SparkSql操作 * * @Auther: fgm */ object HiveSparkSql { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("HiveSparkSql").master("local[2]").enableHiveSupport().getOrCreate() spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','") spark.sql("load data local inpath './data/user.txt' into table user") spark.sql("select * from user").show() spark.stop() } }
注意:这里首先在项目根目录下创建data目录(和src同级),然后在data中穿件user.txt文件,并写入相关数据(1,zhangsan,22)。不然会报错。另外需要开启HiveSupport服务:enableHiveSupport()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。