赞
踩
1.什么是sparkSQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
2.特点:
易整合,统一的数据访问方式,兼容hive,标准的数据连接
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。
DataFrame=RDD+schema
当我们创建了一个DataFrame后,就可以使用SQL语句或者DSL两种方式进行编程了。
创建DataFrame的过程:
1.创建一个普通的RDD
2.定义Schema
3.将RDD与schema关联
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建一个类,并定义类的成员变量
5.整理数据并关联 case class,将非结构化的数据转换成结构化数据
6.将RDD转换成DataFrame(导入隐式转换)
7.将DataFrame注册成临时表
8.书写SQL(Transformation,lazy,如果一条SQL语句不行,就执行完一条SQL语句后,再注册成临时表,然后再写一条SQL))
9.执行Action
package XXX import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SQLDemo1Again { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SQLDemo1Again").setMaster("local[4]") val sc = new SparkContext(conf) //强化SparkContext,使他能够创建DataFrame val sqlContext = new SQLContext(sc) //读取数据 val lines: RDD[String] = sc.textFile("hdfs或本地文件系统") //整理数据 val girlRDD: RDD[Girl] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toDouble Girl(id, name, age, fv) }) //将girlRDD转换成DataFrame import sqlContext.implicits._ val df: DataFrame = girlRDD.toDF() //书写sql语句,进行排序 //使用SQL API,要先将DataFrame注册成临时表 df.registerTempTable("t_girl") //书写SQL语句 val sorted: DataFrame = sqlContext.sql("SELECT * FROM t_girl ORDER BY fv DESC,age ASC") //展示结果 sorted.show() //释放资源 sc.stop() } } case class Girl(id:Long,name:String,age:Int,fv:Double)
1.创建SparkContext
2.创建SQLContext
3.创建RDD
4.创建StructType(schema)
5.整理数据将数据跟Row关联
6.通过rowRDD和schema创建DataFrame
7.将DataFrame注册成临时表
8.书写SQL(Transformation)
9.执行Action
package XXX import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SQLDemo2Again { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SQLDemo2Again").setMaster("local[4]") val sc = new SparkContext(conf) //强化SparkContext,使他能够创建DataFrame val sqlContext = new SQLContext(sc) //读取数据 val lines: RDD[String] = sc.textFile("hdfs或本地文件系统") //整理数据 val rowRDD1: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toDouble Row(id, name, age, fv) }) //创建StructType,就是schema信息,表头 val schema1: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) //将rowRDD转换成DataFrame val df: DataFrame = sqlContext.createDataFrame(rowRDD1,schema1) //将DataFrama关联成临时表 df.registerTempTable("t_person") //书写SQL val sorted: DataFrame = sqlContext.sql("SELECT * FROM t_person ORDER BY fv DESC,age ASC") //展示结果 sorted.show() //释放资源 sc.stop() } }
package XXX import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} import org.apache.spark.sql.types._ object SQLDemo3Again { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SQLDemo2Again").setMaster("local[4]") val sc = new SparkContext(conf) //强化SparkContext,使他能够创建DataFrame val sqlContext = new SQLContext(sc) //读取数据 val lines: RDD[String] = sc.textFile("hdfs或本地文件系统") //整理数据 val rowRDD1: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toDouble Row(id, name, age, fv) }) //创建StructType,就是schema信息,表头 val schema1: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) //将rowRDD转换成DataFrame val dfApi: DataFrame = sqlContext.createDataFrame(rowRDD1,schema1) //不使用SQL的方式,就不用注册临时表了 //使用DataFrame上的方法(DSL) val dfApi1: DataFrame = dfApi.select("name","age","fv") //排序 //$符号表示把后面跟的字段当作一列 //这里的desc只能用小写,因为这个desc是一个方法 //desc是一个方法,所以要使用隐式转换 import sqlContext.implicits._ val sorted: Dataset[Row] = dfApi1.orderBy($"fv" desc,$"age" asc) //展示结果 sorted.show() sc.stop() } }
与SparkSQL 1.X差别不大,连接Spark集群更加方便,而且API更加简洁。
使用sql语句
package XXX import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.types._ object SQLTest1Again { def main(args: Array[String]): Unit = { //创建SparkSession,与spark建立联系 val session = SparkSession.builder() .appName("SQLTest1Again") .master("local[4]") .getOrCreate() //创建RDD val ssc: SparkContext = session.sparkContext //读取数据 val lines: RDD[String] = ssc.textFile("hdfs或本地文件系统") //整理数据 val rowRDD2: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val fv = fields(3).toDouble Row(id, name, age, fv) }) //生成schema val schema2: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("fv", DoubleType, true) )) //创建DataFrame val sdf: DataFrame = session.createDataFrame(rowRDD2,schema2) //排序 //这里的>是一个方法,所以要导入隐式转换 import session.implicits._ val sorted: Dataset[Row] = sdf.where($"fv" > 98).orderBy($"fv" desc,$"age" asc) //展示结果 sorted.show() session.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。