赞
踩
目录
1.3 RDD vs DataFrame vs Dataset
3.3.1 DSL(Domain Specific Language)风格语法
官网:http://spark.apache.org/sql/
Spark SQL是Apache Spark用来处理结构化数据的一个模块,它提供了一个最核心的编程抽象叫做DataFrame,并且作为分布式SQL查询引擎的作用。
SparkSQL可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式。
| Sql | DataFram | Dataset |
Syntax errors | Run time | Compile time | Compile time |
Analysis errors | Run time | Run time | Compile time |
Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错,而Dataset是一个强数据类型,在编译时期就会检查数据类型
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6测试)—>Dataset(Spark2.x之后)
spark1.6—>2.x改变:本文用的版本是spark2.2.0
RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
RDD的最大好处就是简单,API的人性化程度很高。
RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
DataFrame也是懒执行的。
性能上比RDD要高,主要有两方面原因:
图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。
val sparkconf = new SparkConf().setMaster("local").setAppName("test")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map不运行
rdd.map{line=>
println("运行")
line._1
}
3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
4、三者都有partition的概念
5、三者有许多共同的函数,如filter,排序等
6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持
import spark.implicits._
7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型
DataFrame:
testDF.map{
case Row(col1:String,col2:Int)=>{
println(col1);println(col2)
col1}
case _=> ""
}
Dataset:
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
testDS.map{
case Coltest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=> ""
}
RDD:
1、RDD一般和spark mlib同时使用
2、RDD不支持sparksql操作
DataFrame:
1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问,如
testDF.foreach{ line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}
2、DataFrame与Dataset一般不与spark ml同时使用
3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如
dataDF.createOrReplaceTempView("tmp")
spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")
datawDF.write.format("com.bigdata.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")
val datarDF= spark.read.options(options).format("com.bigdata.spark.csv").load()
利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。
Dataset:
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段
而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
/**
rdd("a", 1)("b", 1)("a", 1)
**/
val test: Dataset[Coltest]=rdd.map{line=>
Coltest(line._1,line._2)
}.toDS
test.map{ line=>
println(line.col1)
println(line.col2)
}
可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题
例子:查询年龄大于20岁的用户,创建如下JSON格式的文件
{"name":"jack", "age":22} {"name":"rose", "age":21} {"name":"mike", "age":19}
IDEA中程序的打包和运行都和SparkCore类似,在Maven依赖中导入新的依赖项:
- <!--导入spark sql的依赖jar包-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.2.0</version>
- </dependency>
程序如下:
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SaprkSqlFirstDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) //操作sql的专用实例 val sQLContext = new SQLContext(sc) //导入sql实例上的隐式转换 import sQLContext.implicits._ val df: DataFrame = sQLContext.read.json("e://student.json") df.show() df.filter("age > 20").show() df.createTempView("student") sQLContext.sql("select name,age from student where age > 20").show() sc.stop() } }
在老的版本中,SparkSQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询,SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() import spark.implicits._
SparkSession.builder 用于创建一个SparkSession。
import spark.implicits._的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。
如果需要Hive支持的话,则需要创建以下语句
val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .enableHiveSupport() //开启spark对hive的支持 .getOrCreate() import spark.implicits._
在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。
从已有的RDD进行转换
/* 1001 zhangshan male 1002 lishi female 1003 zhaoliu male */ val file: RDD[String] = sc.textFile("spark_day01/student.txt") val df: DataFrame = file.map(line => { val fiels: Array[String] = line.split(" ") (fiels(0).toInt, fiels(1), fiels(2)) }).toDF("id", "name", "sex") df.show()
其他两种方式我们在后面的数据源介绍
people.txt
tom 18 87
jack 19 88
rose 20 95
import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object DatasetDSLDemo { // 配置日志的显示级别 Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() // 导入隐式转换 import spark.implicits._ val file: Dataset[String] = spark.read.textFile("hdfs://hadoop101:9000/people.txt") val tpDs: Dataset[(String, Int, Int)] = file.map(t => { val split = t.split(" ") (split(0), split(1).toInt, split(2).toInt) }) val pdf: DataFrame = tpDs.toDF("name", "age", "fv") // ("select age from v_person where age > 23 ") // DSL 对应着sql中的一些操作 // select 选择 pdf.select("age","name").show() // where 是sql中的写法 filter where 调用的就是filter pdf.where("age > 18").show() // 排序 orderBy sort 同一个API 默认是升序 pdf.orderBy($"age" desc).show() // 分组聚合 // 统计次数 count(*) pdf.groupBy("age").count().show() // 导入函数 import org.apache.spark.sql.functions._ pdf.groupBy("age").agg(count("*") as "cnts").show() val sumfv = pdf.groupBy("age").agg(sum("fv")) sumfv.show() pdf.groupBy("age").agg(min("fv") as "minfv").orderBy("minfv").show() sumfv.printSchema() // 如何修改字段的名称 sumfv.withColumnRenamed("sum(fv)","sumFV").show() //实际 优先使用SQl 语法风格,一些简单的可以用DSL spark.stop() } }
import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object DatasetSQLDemo { def main(args: Array[String]): Unit = { //两种方式获取sparkSession实例 val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate() /*val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val spark = SparkSession.builder().config(conf).getOrCreate()*/ //在sparkSession中,已经创建好了sqlContext和sparkContext val sqlContext = spark.sqlContext val sc = spark.sparkContext import spark.implicits._ /* 当没有隐式转换时出现下面的错误:缺少import spark.implicits._的支持 Error:(24, 53) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val tpDS: Dataset[(String, Int, Int)] = file.map(t => { */ //读取外部文件获取到Dataset,在Dataset中自带了schema信息 val file: Dataset[String] = spark.read.textFile("hdfs://hadoop101:9000/people.txt") file.printSchema() val tpDS: Dataset[(String, Int, Int)] = file.map(t => { val strings = t.split(" ") (strings(0), strings(1).toInt, strings(2).toInt) }) tpDS.printSchema() val pdf: DataFrame = tpDS.toDF("name","age","fv") pdf.printSchema() //注册临时试图,使用范围为当前session,session退出后,表就失效了。 //可以注册为全局表,但是当使用全局表时,需要全路径访问,如global_temp.people pdf.createTempView("v_people") pdf.createGlobalTempView("people") //写sql val result = spark.sql("select name, age, fv from v_people where age > 18") val result1 = spark.sql("select name, age, fv from global_temp.people where age > 18" result.show() result1.show() spark.close() } }
Dataset是强类型的数据集合,需要提供对应的数据类型信息
import org.apache.spark.sql.{Dataset, SparkSession} object CreateDataset { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() import spark.implicits._ val caseClassDS: Dataset[Person] = Seq(Person("jack","18")).toDS() caseClassDS.show() val primitiveDS: Dataset[Int] = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect().foreach(println(_)) val peopleDS = spark.read.json("hdfs://hadoop101:9000/WordCount/student.json").as[Person] peopleDS.show() } } case class Person(name: String, age: String)
Spark SQL支持通过两种方式将存在的RDD转换为Dataset,转换的过程中需要让Dataset获取RDD中的Schema信息,主要有两种方式,一种是通过反射来获取RDD中的Schema信息。这种方式适合于列名已知的情况下。第二种是通过编程接口的方式将Schema信息应用于RDD,这种方式可以处理那种在运行时才能知道列的方式。
SparkSQL能够自动将包含有case类的RDD转换成DataFrame,case类定义了table的结构,之前我们都是使用case类的方式,将case类属性通过反射变成了表的列名。Case类可以包含诸如Seqs或者Array等复杂的结构。
如果case类不能够提前定义,可以通过下面三个步骤定义一个DataFrame
创建一个多行结构的RDD;
创建用StructType来表示的行结构信息。
通过SparkSession提供的createDataFrame方法来应用Schema 。
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object CreateBySetSchema { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName("CreateBySetSchema") .getOrCreate() val peopleRDD: RDD[String] = spark.sparkContext.textFile("hdfs://hadoop101:9000/people.txt") //创建schema信息 val schemaDemo = StructType( Seq( StructField("name", StringType), StructField("age", IntegerType), StructField("fv", IntegerType) )) import org.apache.spark.sql._ //将读取到的数据切分好字段,转换为Row val rowRDD = peopleRDD .map(_.split(" ")) .map(attributes => Row(attributes(0), attributes(1).toInt,attributes(2).toInt)) //将创建的schema信息和Row传入即可创建DF val peopleDF: DataFrame = spark.createDataFrame(rowRDD,schemaDemo) peopleDF.createTempView("people") val result: DataFrame = spark.sql("select * from people") result.show() } }
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换
这个转换很简单
val rdd1=testDF.rdd
val rdd2=testDS.rdd
import spark.implicits._
val testDF = rdd.map { line => (line._1,line._2) }.toDF("col1","col2")
一般用元组把一行的数据写在一起,然后在toDF中指定字段名
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
这个也很简单,因为只是把case class封装成Row
import spark.implicits._
val testDF = testDS.toDF
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用
import org.apache.spark.sql.{DataFrame, SparkSession} object DefFunctionUDF { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() import spark.implicits._ val df: DataFrame = spark.read.json("hdfs://hadoop101:9000/WordCount/student.json") df.show() //自定义函数并注册 spark.udf.register("addName",(x: String) => "Name:" + x) df.createTempView("student") //使用自定函数 spark.sql("select addName(name),age from student").show() /* +-----------------+---+ |UDF:addName(name)|age| +-----------------+---+ | Name:jack| 22| | Name:rose| 21| | Name:mike| 19| +-----------------+---+ */ } }
强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。
1. 弱类型用户自定义聚合函数:通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数。下面展示一个求平均工资的自定义聚合函数。
salary.json
{"name":"jack", "salary":1500}
{"name":"rose", "salary":1300}
{"name":"mike", "salary":1800}
{"name":"jone", "salary":1600}
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // 聚合函数输入参数的数据类型 def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // 聚合缓冲区中值得数据类型 def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // 返回值的数据类型 def dataType: DataType = DoubleType // 对于相同的输入是否一直返回相同的输出。 def deterministic: Boolean = true // 初始化 def initialize(buffer: MutableAggregationBuffer): Unit = { // 存工资的总额 buffer(0) = 0L // 存工资的个数 buffer(1) = 0L } // 相同Execute间的数据合并。 def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // 不同Execute间的数据合并 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 计算最终结果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) }
import org.apache.spark.sql.SparkSession object DefFunctionText { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() import spark.implicits._ // 注册函数 spark.udf.register("myAverage", MyAverage) val df = spark.read.json("e://salary.json") df.createOrReplaceTempView("employees") df.show() // +----+------+ // |name|salary| // +----+------+ // |jack| 1500| // |rose| 1300| // |mike| 1800| // |jone| 1600| // +----+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 1550.0| // +--------------+ } }
2. 强类型用户自定义聚合函数:通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession // 既然是强类型,可能有case类 case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage1 extends Aggregator[Employee, Average, Double] { // 定义一个数据结构,保存工资总数和工资总个数,初始都为0 def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // 聚合不同execute的结果 def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // 计算输出 def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // 设定之间值类型的编码器,要转换成case类 // Encoders.product是进行scala元组和case类转换的编码器 def bufferEncoder: Encoder[Average] = Encoders.product // 设定最终输出值的编码器 def outputEncoder: Encoder[Double] = Encoders.scalaDouble }
import org.apache.spark.sql.{Dataset, SparkSession, TypedColumn} object DefFunctionText2 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[*]") .appName(this.getClass.getSimpleName) .getOrCreate() //导入隐式转换 import spark.implicits._ //创建Dataset val ds: Dataset[Employee] = spark.read.json("e://salary.json").as[Employee] ds.createOrReplaceTempView("employees") ds.show() // +----+------+ // |name|salary| // +----+------+ // |jack| 1500| // |rose| 1300| // |mike| 1800| // |jone| 1600| // +----+------+ val averageSalary: TypedColumn[Employee, Double] = MyAverage1.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 1550.0| // +--------------+ } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。