赞
踩
SparkSQL支持对结构化和半结构化数据(如json)加载为一张分布式的表,并提供SQL和DSL(面向对象,调用API)对数据进行操作。
在RDD的基础上封装了两种数据类型,类似二维表格,每一列都有字段与类型(也就是Schema),是懒执行的。DataSet更新,主要区别是有无泛型。
1、DataFrame:RDD+字段类型-泛型
2、DataSet:DataFrame+泛型
泛型:String就是泛型,因此DS可以调用string的相关方法
val DS: Dataset[String]
上下文:
SparkCore:SparkContext
SparkSQL:SparkSession
建表有两种方式1、加载文件并注册,2、RDD转化为DS/DF并注册
查询有两种方式1、SQL(需先注册),2、DSL(对DF、DS进行操作)
Spark默认存储数据格式为parquet。load、save不指定文件则为parquet。
val df : DataFrame = SS.read. [文件类型] ("Path")
Spark支持多种数据源
支持文件格式:text、json、csv,Parquet、ORC(红色为列式存储)
·json默认加载为Dataset[Row]
·CSV可以.option指定分隔符等选项
支持数据库:Mysql、orcle、数仓HIVE等
注意:
想转换RDD必须先导入隐式转换import SparkSession.implicits._(需放在SC之后)
SparkSQL(DF、DS对象)常用方法.printSchema()、.show()
步骤
1、准备环境
2、创建RDD,载入原始数据
3、数据处理(一般用map行转对象,并用数组切割封装RDD)
4、toDF
5、关闭资源
34之间需要指定字段,常用方法有1、指定案例类(步骤3),2、指定字段(步骤4)
1、使用案例类
使用案例类需要放在方法的作用域之外(即java的成员变量位置)
案例 ,图二为案例类
这里是将每行数据切割成数组,封装至案例类,并Map每行变为每个对象的数组
2、指定类型+字段
在数据处理时,封装RDD为指定其类型map每行成为每个对象,并在转化为DF时指定字段名
3、自定义Schema
较少使用,调用SS使用 CreateDataFrame函数,传入rowRDD和schema
df.[repartition(1)].write.format("文件格式").save("path")
1.3.1保存模式
Spark中和hadoop一样,保存的路径之前必须不存在,但Spark提供了.mode(SaveMode.Overwrite),可选择抛出异常、追加、覆盖、忽略等
必须先导入隐式转换import SparkSession.implicits._
DF/DS -> .rdd
DF.rdd为RDD[Row],原来DF为行列,现在列去掉了仅剩下行
RDD -> .toDF/toDS
DStoDF:DS.toDF
DFtoDS:DF.as[泛型名]
注意:
1、DF无泛型,转DS要as[泛型名]
2、DF无泛型,因此DF转RDD为RowRDD
使用SQL之前先将DF/DS创建视图,相当于注册表名
DF.createXXX("ViewName") //相当于注册表名
TempView:视图
Replace:替换
GlobalTempView:全局视图,可以跨SparkSeesion
- SS.sql("语句").show()
- //SS为SparkSession的定义名
查询语法
面向对象的SQL,不需要创建视图,直接对DF/DS操作
DF/DS才能进行.show操作,使用时报错可以看返回类型
- //查询列
- PersonDF.select("name","age").show()
-
- //查询age+1
- //$符:把字符串转换成Column对象取出,必须先import SS.implicits._
- PersonDF.select($"name",$"age"+1).show()
-
- //’符:把列名转换为Column对象取出,以’X1为单一元素,’为分隔符
- PersonDF.select('name,'age+1).show()
-
- //where
- //他俩是一样的
- PersonDF.where("age>25").show()
- PersonDF.filter("age>25").show()
- PersonDF.where('name==="zhangsan").show()
- //注意语法: '==="" 等于===、不等于=!=
-
- //聚合函数
- //聚合函数后成为一个Long/Float类型,需要.var接收
- PersonDF.where("age>25").count()
-
- //分组并统计个数
- PersonDF.groupBy("color").count().show()
-
- //排序(WC案例)
- words.groupBy("value").count().orderBy("count").show()
- //OrderBy参数为排序字段
1、先注册UDF
- SS.udf.register("函数名",函数)
- //实例
- SS.udf.register("StoB", (value:String)=>{ value.toUpperCase() } )
2、调用函数名并传参
如下图案例,在返回的name前添加Name:
5.1 Linux下连接Hive
Spark虽然内置有hive,但一般连接外置Hive,需要进行一些配置
5.2 编译器连接外置Hive
在Idea中读取数据库中文件文件格式需要设置为JDBC,并且需要用.option配置数据库地址等信息
5.2.1从MySQL读
5.2.2从MySQL写
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。