赞
踩
Dataset是一个分布式数据集,Dataset是在spark-1.6提出新的API,该API构建在RDD(strong type,使用lambda表达式)之上同时可以借助于Spark SQL对执行引擎的优点,使得使用Dateset执行一些数据的转换比直接使用RDD算子功能和性能都有所提升。因此我们可以认为Dateset就是一个加强版本的RDD。Dataset除了可以使用JVM中数组|集合对象创建之外,也可以将任意的一个RDD转换为Dataset。
Python does not have the support for the Dataset API.
DataFrames 是Dataset的一种特殊情况。比如 Dataset中可以存储任意对象类型的数据作为Dataset的元素。但是Dataframe的元素只有一种类型Row类型,这种基于Row查询和传统数据库中ResultSet操作极其相似。因为Row类型的数据表示Dataframe的一个元素,类似数据库中的一行,这些行中的元素可以通过下标或者column name访问。由于Dateset是API的兼容或者支持度上不是多么的好,但是Dataframe在API层面支持的Scala、Java、R、Python支持比较全面。
df.createGlobalTempView() // 对DF创建全局的临时视图,它产生的表,可以多个spark session共享,它的生命周期和spark application绑定
df.createTempView() // 对DF创建局部的临时视图,它产生的表,仅供创建spark session使用,其它的spark session无法获取
package com.baizhi.sql import org.apache.spark.sql.SparkSession object DataFrameSqlOpt { def main(args: Array[String]): Unit = { // 1. sparkSession是spark sql应用入口,内部封装了sparkconf和sparkContext val spark = SparkSession .builder() .appName("the first spark sql example") .master("local[*]") .getOrCreate() // 2. 创建Dataset val rdd = spark.sparkContext.makeRDD(List("Hello Hadoop", "Hello Scala")).flatMap(_.split(" ")).map((_, 1)) //导入spark的隐式增强之后才能使用RDD的toDF()方法把RDD转换成DataFrame import spark.implicits._ val df = rdd.toDF("word", "num") // 给df起了表名 如果是全局表的话,在访问的时候需要加数据库名【】 // df.createGlobalTempView("t_user") // 对DF创建全局的临时视图,它产生的表,可以多个spark session共享,它的生命周期和spark application绑定 df.createTempView("t_user") // 对DF创建局部的临时视图,它产生的表,仅供创建spark session使用,其它的spark session无法获取 // 再创建一个session,请问是否能够使用全局表? 正确 // val newSparkSession = spark.newSession() // spark.sql("select * from global_temp.t_user").show() // newSparkSession.sql("select * from global_temp.t_user").show() // 再创建一个session,请问是否能够使用局部临时表? 错误 val newSparkSession = spark.newSession() spark.sql("select * from t_user").show() newSparkSession.sql("select * from t_user").show() spark.stop() } }
import spark.implicits._
val userDF = List((1, "zs", true, 18, 15000, 1), (2, "ls", false, 19, 15000, 1)).toDF("id", "name", "sex", "age", "salary", "dept")
userDF.createTempView("t_user")
//查询结果为名字中带有字母"z"并且年龄大于18的
spark.sql("select * from t_user where name like '%z%' and age > 18").show()
// 排序查询
spark.sql(
// 自动将"""引起的内容 进行字符串拼接
"""
select
*
from t_user
order by id desc
""").show()
spark.sql( """ select sex,avg(salary) as avg_salary from t_user group by sex """).show() //--------------------------------------------------------------------------- +-----+----------+ | sex|avg_salary| +-----+----------+ | true| 15000.0| |false| 15000.0| +-----+----------+
// 分组查询 统计男和女的平均工资 spark.sql( """ select sex,avg(salary) as avg_salary from t_user group by sex limit 1 //只返回一条查询结果 """).show() //--------------------------------------------------------------------------- +----+----------+ | sex|avg_salary| +----+----------+ |true| 15000.0| +----+----------+
spark.sql( """ select sex,avg(salary) as avg_salary from t_user group by sex having sex = true //分组后返回性别为true的 """).show() //--------------------------------------------------------------------------- +----+----------+ | sex|avg_salary| +----+----------+ |true| 15000.0| +----+----------+
.sql(
"""
| select
| id,name,age,
| case sex
| when true
| then '男'
| else '女'
| end as newsex
| from t_employee
""".stripMargin)
.show()
var scoreDF = List(
(1, "语文", 100),
(1, "数学", 100),
(1, "英语", 100),
(2, "数学", 79),
(2, "语文", 80),
(2, "英语", 100))
.toDF("id", "course", "score")
scoreDF.createTempView("t_course")
spark.sql(
"""
| select * from t_course
| pivot(max(score) for course in('数学','英语','语文'))
""".stripMargin)
.show()
所谓多维度查询就是根据查询条件的不同组合来查询,例如cube(A,B),它有四种组合情况,(A,B)、(A,null)、(null,B)、(null,null),然后根据这些组合进行分组,这就是多维度分组查询
// cube (A,B) // A null // null B // A B val df2 = List( (110, 50, 80, 80), (120, 60, 95, 75), (120, 50, 96, 70)) .toDF("height", "weight", "uiq", "ueq") df2.createTempView("tt_user") spark.sql( """ | select | height,uiq,avg(uiq) | from | tt_user | group by | cube(height,uiq) """.stripMargin).show() //----------------------------------------------------------------- +------+----+-----------------+ |height| uiq| avg(uiq)| +------+----+-----------------+ | 120|null| 95.5| | null| 80| 80.0| | null|null|90.33333333333333| | null| 95| 95.0| | 120| 95| 95.0| | 110|null| 80.0| | 110| 80| 80.0| | 120| 96| 96.0| | null| 96| 96.0| +------+----+-----------------+
val userInfoDF = spark.sparkContext.makeRDD(List((1, "zs"), (2, "ls"), (3, "ww"))).toDF("id", "name") val orderInfoDF = spark.sparkContext.makeRDD(List((1, "iphone", 1000, 1), (2, "mi9", 999, 1), (3, "连衣裙", 99, 2))).toDF("oid", "product", "price", "uid") userInfoDF.createTempView("t_user") orderInfoDF.createTempView("t_order") //连接类型:inner left_outer right_outer full cross spark.sql( """ | select * from t_user t1 | inner join | t_order t2 | on | t1.id = t2.uid """.stripMargin).show() //------------------------------------------------------------- +---+----+---+-------+-----+---+ | id|name|oid|product|price|uid| +---+----+---+-------+-----+---+ | 1| zs| 1| iphone| 1000| 1| | 1| zs| 2| mi9| 999| 1| | 2| ls| 3| 连衣裙| 99| 2| +---+----+---+-------+-----+---+
以一个select查询的结果作为一张要去查询的表进行查询
// 子查询 val df = List( (1, "zs", true, 1, 15000), (2, "ls", false, 2, 18000), (3, "ww", false, 2, 14000), (4, "zl", false, 1, 18000), (5, "win7", false, 1, 16000) ).toDF("id", "name", "sex", "dept", "salary") df.createTempView("t_employee") spark.sql( """ select id, name, sex, dept from (select * from t_employee) """.stripMargin).show() //----------------------------------------------------- +---+----+-----+----+ | id|name| sex|dept| +---+----+-----+----+ | 1| zs| true| 1| | 2| ls|false| 2| | 3| ww|false| 2| | 4| zl|false| 1| | 5|win7|false| 1| +---+----+-----+----+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。