赞
踩
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是 DataFrame
。
DataFrame=RDD+Schema
它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。 DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据 库,以及RDD
Spark1.3出现的 DataFrame ,Spark1.6出现了 DataSet ,在Spark2.0中两者统一,DataFrame等于 DataSet[Row]
要使用Spark SQL,首先需要创建一个SpakSession
对象
SparkSession中包含了SparkContext
和SqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext
这个SqlContext是使用sparkSQL操作hive的时候会用到的
使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
那下面我们来使用JSON文件来创建一个DataFrame
想要使用spark-sql需要先添加spark-sql的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
使用案例
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SqlDemoScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("SqlDemoScala") .config(conf) .getOrCreate() //读取json文件,获取DataFrame val stuDf = sparkSession.read.json("~/student.json") //查看DataFrame中的数据 stuDf.show() sparkSession.stop() } }
Java版本
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class SqlDemoJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("SqlDemoJava") .config(conf) .getOrCreate(); //读取json文件,获取Dataset<Row> Dataset<Row> stuDf = sparkSession.read().json("~/student.json"); stuDf.show(); sparkSession.stop(); } }
由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
尝试对他们进行转换
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响
// 将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("~/student.json").as("stu")
在Java代码中将DataSet[Row]转换为DataFrame
// 将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("~/student.json").toDF();
DataFrame有以下常用算子:
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DataFrameOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("DataFrameOpScala") .config(conf) .getOrCreate() val stuDf = sparkSession.read.json("~/student.json") //打印schema信息 stuDf.printSchema() //默认显示所有数据,可以通过参数控制显示多少条 stuDf.show(2) //查询数据中的指定字段信息 stuDf.select("name","age").show() //在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错 import sparkSession.implicits._ stuDf.select($"name",$"age"+1).show() //对数据进行过滤,需要添加隐式转换函数,否则报错 stuDf.filter($"age">18).show() //where底层调用的就是filter stuDf.where($"age">18).show() //对数据进行分组求和 stuDf.groupBy("age").count().show() sparkSession.stop() } }
Java版本
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col; public class DataFrameOpJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("DataFrameOpJava") .config(conf) .getOrCreate(); //读取json文件,获取Dataset<Row> Dataset<Row> stuDf = sparkSession.read().json("~/student.json"); //打印schema信息 stuDf.printSchema(); //默认显示所有数据,可以通过参数控制显示多少条 stuDf.show(2); //查询数据中的指定字段信息 stuDf.select("name","age").show(); //在使用select的时候可以对数据做一些操作,需要引入 import static org.apache.spark.sql.functions.col; stuDf.select(col("name"),col("age").plus(1)).show(); //对数据进行过滤 stuDf.filter(col("age").gt(18)).show(); stuDf.where(col("age").gt(18)).show(); //对数据进行分组求和 stuDf.groupBy("age").count().show(); sparkSession.stop(); } }
这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的
想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作
下面来看一个案例
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object DataFrameSqlScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("DataFrameSqlScala") .config(conf) .getOrCreate() val stuDf = sparkSession.read.json("~/student.json") //将DataFrame注册为一个临时表 stuDf.createOrReplaceTempView("student") //使用sql查询临时表中的数据 sparkSession.sql("select age,count(*) as num from student group by age") .show() sparkSession.stop() } }
java版本
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class DataFrameSqlJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("DataFrameSqlJava") .config(conf) .getOrCreate(); Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json"); //将Dataset<Row>注册为一个临时表 stuDf.createOrReplaceTempView("student"); //使用sql查询临时表中的数据 sparkSession.sql("select age,count(*) as num from student group by age") .show(); sparkSession.stop(); } }
为什么要将RDD转换为DataFrame?
在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation
算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。
所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame
这种方式是使用反射来推断RDD中的元数据。
基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。
Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为 DataFrame的
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object RddToDataFrameByReflectScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("RddToDataFrameByReflectScala") .config(conf) .getOrCreate() //获取SparkContext val sc = sparkSession.sparkContext val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30))) //基于反射直接将包含Student对象的dataRDD转换为dataFrame //需要导入隐式转换 import sparkSession.implicits._ val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF() //下面就可以通过DataFrame的方式操作dataRDD中的数据了 stuDf.createOrReplaceTempView("student") //执行sql查询 val resDf = sparkSession.sql("select name,age from student where age > 18") //将DataFrame转化为RDD val resRDD = resDf.rdd //从row中取数据,封装成student,打印到控制台 resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt)) .collect() .foreach(println(_)) //使用row的getAs方法,获取指定列名的值 resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age"))) .collect() .foreach(println(_)) sparkSession.stop() } } //定义一个Student case class Student(name: String,age: Int)
Java版本
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class RddToDataFrameReflectJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("RddToDataFrameReflectJava") .config(conf) .getOrCreate(); //获取SparkContext //从sparkSession中获取的是scala中的SparkContext,所以需要转换成java中的SparkContext JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Tuple2<String, Integer> t1 = new Tuple2<>("jack", 18); Tuple2<String, Integer> t2 = new Tuple2<>("tom", 20); Tuple2<String, Integer> t3 = new Tuple2<>("jessic", 30); JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3)); JavaRDD<Student> stuRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Student>() { @Override public Student call(Tuple2<String, Integer> tup) throws Exception { return new Student(tup._1, tup._2); } }); //注意:Student这个类必须声明为public,并且必须实现序列化 Dataset<Row> stuDf = sparkSession.createDataFrame(stuRDD, Student.class); stuDf.createOrReplaceTempView("student"); //执行sql查询 Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 18"); //将DataFrame转化为RDD,注意,这里需要转化为JavaRDD JavaRDD<Row> resRDD = resDf.javaRDD(); //从row中取数据,封装成student,打印到控制台 List<Student> resList = resRDD.map(new Function<Row, Student>() { @Override public Student call(Row row) throws Exception { //return new Student(row.getString(0),row.getInt(1)); //通过getAs获取数据 return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString())); } }).collect(); for(Student stu: resList){ System.out.println(stu); } sparkSession.stop(); } }
这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。
也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了 下面看一个案例:
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object RddToDataFrameByProgramScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("RddToDataFrameByReflectScala") .config(conf) .getOrCreate() //获取SparkContext val sc = sparkSession.sparkContext val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30))) //组装rowRDD val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2)) //指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】 val schema = StructType(Array( StructField("name",StringType,true), StructField("age",IntegerType,true) )) //组装DataFrame val stuDf = sparkSession.createDataFrame(rowRDD,schema) //下面就可以通过DataFrame的方式操作dataRDD中的数据了 stuDf.createOrReplaceTempView("student") //执行sql查询 val resDf = sparkSession.sql("select name,age from student where age > 18") //将DataFrame转化为RDD val resRDD = resDf.rdd resRDD.map(row=>(row(0).toString,row(1).toString.toInt)) .collect() .foreach(println(_)) sparkSession.stop() } }
Java版本
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.sources.In; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class RddToDataFrameByProgramJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("RddToDataFrameByProgramJava") .config(conf) .getOrCreate(); //获取SparkContext //从sparkSession中获取的是scala中的SparkContext,所以需要转换成java中的SparkContext JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Tuple2<String, Integer> t1 = new Tuple2<>("jack", 18); Tuple2<String, Integer> t2 = new Tuple2<>("tom", 20); Tuple2<String, Integer> t3 = new Tuple2<>("jessic", 30); JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3)); //组装RDD JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Row>() { @Override public Row call(Tuple2<String, Integer> tup) throws Exception { return RowFactory.create(tup._1, tup._2); } }); //指定元数据信息 ArrayList<StructField> structFieldList = new ArrayList<>(); structFieldList.add(DataTypes.createStructField("name",DataTypes.StringType,true)); structFieldList.add(DataTypes.createStructField("age",DataTypes.IntegerType,true)); StructType schema = DataTypes.createStructType(structFieldList); //构建DataFrame Dataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema); stuDf.createOrReplaceTempView("student"); Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 18"); JavaRDD<Row> resRDD = resDf.javaRDD(); List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String, Integer>(row.getString(0), row.getInt(1)); } }).collect(); for(Tuple2<String,Integer> tup: resList){ System.out.println(tup); } sparkSession.stop(); } }
对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load
和save
操作。
load操作主要用于加载数据,创建出DataFrame;
save操作,主要用于将DataFrame中的数据保存到文件中。
我们前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用法吗?
查看json方法的源码会发现,它底层调用的是format和load方法
def json(paths: String*): DataFrame = format("json").load(paths : _*)
我们如果使用原始的format和load方法加载数据,此时如果不指定format,则默认读取的数据源格式是parquet
,也可以手动指定数据源格式。
Spark SQL
内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text
通过这个功能,就可以在不同类型的数据源之间进行转换了。
下面来看使用案例:
Scala版本
import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object LoadAndSaveOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder() .appName("LoadAndSaveOpScala") .config(conf) .getOrCreate() //读取数据 val stuDf = sparkSession.read.format("json").load("~/student.json") //保存数据 stuDf.select("name","age") .write .format("csv") .mode(SaveMode.Append)//追加 .save("hdfs://bigdata01:9000/out-save001") sparkSession.stop() } }
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class LoadAndSaveOpJava { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); //创建SparkSession对象,里面包含SparkContext和SqlContext SparkSession sparkSession = SparkSession.builder() .appName("LoadAndSaveOpJava") .config(conf) .getOrCreate(); //读取数据 Dataset<Row> stuDf = sparkSession.read().format("json").load("~/student.json"); //保存数据 stuDf.select("name","age") .write() .format("csv") .save("hdfs://bigdata01:9000/out-save002"); sparkSession.stop(); } }
执行代码,查看结果,csv文件是使用逗号分隔的:
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out-save001
Found 2 items
-rw-r--r-- 3 yehua supergroup 0 2020-05-29 17:53 /out-save001/_SUC
-rw-r--r-- 3 yehua supergroup 46 2020-05-29 17:53 /out-save001/part
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out-save001/part-00000-9bf82de6
jack,19
tom,18
jessic,27
hehe,18
haha,15
Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子 的,因此是有一定风险出现脏数据的。
SaveMode | 解释 |
---|---|
SaveMode.ErrorIfExists (默认) | 如果目标位置已经存在数据,那么抛出一个异常 |
SaveMode.Append | 如果目标位置已经存在数据,那么将数据追加进去 |
SaveMode.Overwrite | 如果目标位置已经存在数据,那么就将已经存在的数据删除,然后再写入 |
SaveMode.Ignore | 如果目标位置已经存在数据,那么就忽略,不做任何操作 |
在LoadAndSaveOpScala中增加SaveMode的设置,重新执行,验证结果
将SaveMode设置为Append,如果目标已存在,则追加
stuDf.select("name","age")
.write
.format("csv")
.mode(SaveMode.Append) //追加
.save("hdfs://bigdata01:9000/out-save001")
执行之后的结果确实是追加到之前的结果目录中了
Spark中提供了很多内置的函数
种类 | 函数 |
---|---|
聚合函数 | avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
集合函数 | array_contains, explode, size |
日期/时间函数 | datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format |
数学函数 | abs, ceil, floor, round |
混合函数 | if, isnull, md5, not, rand, when |
字符串函数 | concat, get_json_object, length, reverse, split, upper |
窗口函数 | denseRank, rank, rowNumber |
这里面的函数和hive中的函数是类似的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。