当前位置:   article > 正文

Spark SQL_spark sql支持读写哪些类型的数据

spark sql支持读写哪些类型的数据

0 简介

SparkSQL支持对结构化和半结构化数据(如json)加载为一张分布式的表,并提供SQL和DSL(面向对象,调用API)对数据进行操作。

在RDD的基础上封装了两种数据类型,类似二维表格,每一列都有字段与类型(也就是Schema),是懒执行的。DataSet更新,主要区别是有无泛型。

1、DataFrame:RDD+字段类型-泛型 

2、DataSet:DataFrame+泛型

泛型:String就是泛型,因此DS可以调用string的相关方法

val DS: Dataset[String]

上下文:

SparkCore:SparkContext

SparkSQL:SparkSession

建表有两种方式1、加载文件并注册,2、RDD转化为DS/DF并注册

查询有两种方式1、SQL(需先注册),2、DSL(对DF、DS进行操作)

1、加载保存数据

Spark默认存储数据格式为parquet。load、save不指定文件则为parquet。

1.1加载文件为DF/DS

val df : DataFrame = SS.read. [文件类型] ("Path")

Spark支持多种数据源

支持文件格式:text、json、csv,ParquetORC(红色为列式存储)

        ·json默认加载为Dataset[Row]

        ·CSV可以.option指定分隔符等选项

支持数据库:Mysql、orcle、数仓HIVE等

1.2 RDD to DataFrame/DataSet为表建构

注意:

想转换RDD必须先导入隐式转换import SparkSession.implicits._(需放在SC之后)

SparkSQL(DF、DS对象)常用方法.printSchema()、.show()

步骤

1、准备环境

2、创建RDD,载入原始数据

3、数据处理(一般用map行转对象,并用数组切割封装RDD)

4、toDF

5、关闭资源

34之间需要指定字段,常用方法有1、指定案例类(步骤3),2、指定字段(步骤4)

1、使用案例类

使用案例类需要放在方法的作用域之外(即java的成员变量位置)

案例 ,图二为案例类

这里是将每行数据切割成数组,封装至案例类,并Map每行变为每个对象的数组

2、指定类型+字段

在数据处理时,封装RDD为指定其类型map每行成为每个对象,并在转化为DF时指定字段名

3、自定义Schema

较少使用,调用SS使用 CreateDataFrame函数,传入rowRDD和schema

1.3保存文件

df.[repartition(1)].write.format("文件格式").save("path")

1.3.1保存模式

Spark中和hadoop一样,保存的路径之前必须不存在,但Spark提供了.mode(SaveMode.Overwrite),可选择抛出异常、追加、覆盖、忽略等

2、RDD DF DS互转

必须先导入隐式转换import SparkSession.implicits._

DF/DS -> .rdd

        DF.rdd为RDD[Row],原来DF为行列,现在列去掉了仅剩下行

RDD -> .toDF/toDS

DStoDF:DS.toDF

DFtoDS:DF.as[泛型名]

注意:

1、DF无泛型,转DS要as[泛型名]

2、DF无泛型,因此DF转RDD为RowRDD

3、SQL/DSL

3.1SQL

使用SQL之前先将DF/DS创建视图,相当于注册表名

DF.createXXX("ViewName") //相当于注册表名

TempView:视图

Replace:替换

GlobalTempView:全局视图,可以跨SparkSeesion

  1. SS.sql("语句").show()
  2. //SS为SparkSession的定义名

查询语法

3.2DSL

面向对象的SQL,不需要创建视图,直接对DF/DS操作

DF/DS才能进行.show操作,使用时报错可以看返回类型

  1. //查询列
  2. PersonDF.select("name","age").show()
  3. //查询age+1
  4. //$符:把字符串转换成Column对象取出,必须先import SS.implicits._
  5. PersonDF.select($"name",$"age"+1).show()
  6. //’符:把列名转换为Column对象取出,以’X1为单一元素,’为分隔符
  7. PersonDF.select('name,'age+1).show()
  8. //where
  9. //他俩是一样的
  10. PersonDF.where("age>25").show()
  11. PersonDF.filter("age>25").show()
  12. PersonDF.where('name==="zhangsan").show()
  13. //注意语法: '==="" 等于===、不等于=!=
  14. //聚合函数
  15. //聚合函数后成为一个Long/Float类型,需要.var接收
  16. PersonDF.where("age>25").count()
  17. //分组并统计个数
  18. PersonDF.groupBy("color").count().show()
  19. //排序(WC案例)
  20. words.groupBy("value").count().orderBy("count").show()
  21. //OrderBy参数为排序字段

4、UDF

1、先注册UDF

  1. SS.udf.register("函数名",函数)
  2. //实例
  3. SS.udf.register("StoB", (value:String)=>{ value.toUpperCase() } )

2、调用函数名并传参

如下图案例,在返回的name前添加Name: 

5、Spark On Hive

5.1 Linux下连接Hive

Spark虽然内置有hive,但一般连接外置Hive,需要进行一些配置

  • 需要把hive-site.xml 复制到 conf/目录下
  • 把 Mysql 的Connect驱动复制到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 重启Spark

5.2 编译器连接外置Hive

6、从数据库中读写

在Idea中读取数据库中文件文件格式需要设置为JDBC,并且需要用.option配置数据库地址等信息

5.2.1从MySQL读

 5.2.2从MySQL写

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/603733
推荐阅读
相关标签
  

闽ICP备14008679号