当前位置:   article > 正文

spark sql 总结

spark sql 总结

一.概述

1.前世今生

  • 大量数据需要处理 ➡️ MapReduce出现
  • sql on mr ➡️ Hive
  • mr效率太低 ➡️ Tez
  • Tez效率低 ➡️ Spark
  • sql on spark ➡️ Shark(太多的的借鉴了Hive制约了它,然后被推翻了,现在已经被弃用)
  • sql on spark ➡️ SparkSql

2.简介

  • Spark SQL是Spark处理数据的一个模块
  • 专门用来处理结构化数据的模块,像json,parquet,avro,csv,普通表格数据等均可。
  • 与基础RDD的API不同,Spark SQL中提供的接口将提供给更多关于结构化数据和计算的信息,并针对这些信息,进行额外的处理优化。

3.操作方式

  • SparkSql shell
    • 类似于hive shell
  • DataFrames API
    • 最早专为sql on spark设计的数据抽象,与RDD相似,增加了数据结构scheme描述信息部分。
    • 写spark代码,面向DF(DataFrams缩写)编程,可以与其它Spark应用代码无缝集成。
    • 比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。
  • DataSets API
    • 集成了RDD强类型和DataFrames结构化的优点,官方正强力打造的新数据抽象类型。
    • 写spark代码,面向DS编程,可以与其它Spark应用代码无缝集成。
    • 比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。
    • 面向程序接口对接的操作:通过JDBC、ODBC等方式操作SparkSql,通过jdbc、odbc链接后,发送相关的sparksql请求,实现基于sparksql功能开发。

4.特点

  • 可以利用SQL、DataFrams API、DataSets API或其它语言调用的基于sparksql模块计算,均是sparkcore执行引擎,其对计算的表达是独立的,即开发人员可以轻松在不同API之间切换实现相同的功能。
  • 也可以通过命令行、JDBC、ODBC的方式来操作SparkSQL,方便其它数据平台、BI平台使用SparkSql模块。
  • 在spark应用程序开发中,可以无缝使用SparkSql操作数据。
  • 可以直接使用Hive表格数据。
  • 与Hive的兼容性极好:它复用了Hive的前端(去掉驱动mapreduce执行任务的部分)和元数据,因此可以拿过来hivesql的东西在sparksql上运行即可。
  • SparkSql的应用中,sql是一个重要方面,但不局限制sql。

5.SparkSql愿景

  • 写更少的代码
  • 读更少的数据
  • 把优化的工作交由底层的优化器运行,也是就小白和高手写出来的代码执行效率一样

二.相关名词解释

  • SQL
    • 数据查询语言,面向数据编程的最高抽象
  • HQL = Hive Sql
    • sql on hadoop
  • Shark
    • 最早发展的sql on spark项目,已废弃
  • SparkSql
    • spark on sql 首先

三.shell 操作sparksql

  • 进入环境
    • 直接输入spark-sql
    • 指定运行模式 spark-sql local[*]
    • 类似hive -e 输入spark-sql -e “spark-sql code”
  • 操作
    • 与hive类似

四.DataFrames操作sparksql

1.项目创建

首先根据模板创建一个scala项目
模板:

group:net.alchim31.maven
artifact: scala-archetype-simple
version: 1.7
repository:https://maven.aliyun.com/repository/central

2.配置项目
根目录创建路径null/bin,然后将winutils.exe在这里
导入必要的依赖,并修改scala版本

  1.   <properties>
  2.     <maven.compiler.source>1.8</maven.compiler.source>
  3.     <maven.compiler.target>1.8</maven.compiler.target>
  4.     <encoding>UTF-8</encoding>
  5.     <scala.version>2.11.11</scala.version>
  6.     <scala.compat.version>2.11</scala.compat.version>
  7.     <spec2.version>4.2.0</spec2.version>
  8.   </properties>    
  9.     <!--scala依赖-->
  10.     <dependency>
  11.       <groupId>org.scala-lang</groupId>
  12.       <artifactId>scala-library</artifactId>
  13.       <version>${scala.version}</version>
  14.     </dependency>
  15.     <!--sparkcore依赖 -->
  16.     <dependency>
  17.       <groupId>org.apache.spark</groupId>
  18.       <artifactId>spark-core_${scala.compat.version}</artifactId>
  19.       <version>2.3.2</version>
  20.       <scope>provided</scope>
  21.     </dependency>
  22.     <!--sparksql依赖-->
  23.     <dependency>
  24.       <groupId>org.apache.spark</groupId>
  25.       <artifactId>spark-sql_${scala.compat.version}</artifactId>
  26.       <version>2.3.2</version>
  27.       <scope>provided</scope>
  28.     </dependency>
  29.     <!--log4j-->
  30.     <dependency>
  31.       <groupId>org.apache.logging.log4j</groupId>
  32.       <artifactId>log4j</artifactId>
  33.       <version>2.14.1</version>
  34.     </dependency>

在main下创建目录resouces目录,并将log4j的配置文件放入

3.代码编写

3.1DataFrames1.6

  1. package com.antg.main
  2. import org.apache.spark.sql.SQLContext
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object DataFrames1_6 {
  5.   def main(args: Array[String]): Unit = {
  6.     //创建sparkconf
  7.     val conf = new SparkConf()
  8.     conf.setMaster("local")
  9.     conf.setAppName("测试DF1.6")
  10.     //创建上下文环境
  11.     val sc = new SparkContext(conf)
  12.     //创建sql上下文
  13.     val sqlContext = new SQLContext(sc)
  14.     //读取数据
  15.     val df = sqlContext.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
  16.     //显示全部信息
  17.     df.show()
  18.     //关闭上下文
  19.     sc.stop()
  20.   }
  21. }

3.2Dataframes2.3

  1. package com.antg.main
  2. import org.apache.spark.sql.SparkSession
  3. object DataFrames2_3 {
  4.   def main(args: Array[String]): Unit = {
  5.     //创建session
  6.     val sparkSession =  SparkSession.builder()
  7.       .master("local[*]")
  8.       .appName("dataframes2.3")
  9.       .getOrCreate()
  10.     //创建df
  11.     val df = sparkSession.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
  12.     //虚表
  13.     val vrTable = df.createTempView("vrTable")
  14.     sparkSession.sql("select * from vrTable").show()
  15.     //数据持久化
  16.     df.repartition(2).write.format("parquet").save("./data")
  17.     //关闭
  18.     sparkSession.stop()
  19.   }
  20. }

3.3rdd转换成df

  1. package com.antg.main
  2. import org.apache.spark.sql.{Row, SparkSession}
  3. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  4. object RDD_DF {
  5.   def main(args: Array[String]): Unit = {
  6.     var sparkSession = SparkSession.builder()
  7.       .appName("test_rdd to df")
  8.       .master("local[*]")
  9.       .getOrCreate()
  10.     var scheme = StructType(
  11.       "stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
  12.     )
  13.     var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\student_mysql.txt")
  14.     var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
  15.     var df = sparkSession.createDataFrame(rowRDD,scheme)
  16.     df.show()
  17.     df.printSchema()
  18.     sparkSession.stop()
  19.   }
  20. }

五.parquet数据格式

  • 概述
    • spark 默认的输入输出格式,spark自带的格式,也是强力推荐的格式
  • 产生背景
    • 面向分析型业务的列式存储格式
    • 由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目.
    • Twitter的日志结构是复杂的嵌套数据类型,需要设计一种列式存储格式,既能支持关系型数据(简单数据类型),又能支持复杂的嵌套类型的数据,同时能够适配多种数据处理框架。
  • 优点
    • 压缩数据,内部自带gzip压缩
    • 不失真
    • 减少IO吞吐量
    • 高效的查询
    • 多数据处理平台,均支持parquet,包括hive等。

六.DataSet 操作sparksql

  • 环境搭建
    • 与DataFrames一样
  • 概述
    • DataSet集成了RDD和DataFrame的优点,也称为强类型的DataFrame。
    • DataSets和DataFrames具有完全相同的成员函数。
    • 两者中,每个行的数据类型不同。DataFrame也可以叫Dataset[Row],即DataFrame是Dataset的一种特定形式。而DataSet的每一行是不固定的,需要模式匹配来确定。
  • 版本说明
    • 1.6 版本的时候为测试版本,好多API还不是很丰富
    • 在2.0.0开始DataSet得到正式推广使用,由于其API和DataFrame在成员函数中完全对等,在使用上差异极小,由于是强类型,故仅在数据集case class模式匹配时,有明显差别。

例子

  1. package com.antg.main
  2. import org.apache.spark.sql.SparkSession
  3. case class Student(name:String,age:BigInt)
  4. object TestDS {
  5.   def main(args: Array[String]): Unit = {
  6.     //创建Session
  7.     val sparkSession = SparkSession.builder()
  8.       .appName("ds test")
  9.       .master("local[*]")
  10.       .getOrCreate()
  11.     //引入自动隐式转换
  12.     import sparkSession.implicits._
  13.     //使用基础数据类型创建DataSet
  14.     val a = Seq(1,2,3).toDS()
  15.     //使用DataSet
  16.     a.map(_+1).collect.foreach(println)
  17.     a.show()
  18.     //使用样例类创建DS
  19.     val b = Seq(Student("tom",22)).toDS()
  20.     b.show()
  21.     //通过导入文件创建,并使用样例类指定DS的格式
  22.     val path = "C:\\Users\\Administrator\\Desktop\\student_data.txt"
  23.     val c = sparkSession.read.json(path).as[Student]
  24.     c.show()
  25.     //由于是强类型,所以这里可以很方便的操作ds中的内容
  26.     c.foreach(x=>println(x.age))
  27.   }
  28. }

student_data.txt

  1. {"name":"张一","age":10,"address":"国际庄"}
  2. {"name":"张二","age":20}
  3. {"name":"张三","age":30}
  4. {"name":"张四","age":40}

七.各个数据集的对比分析

  • spark数据集
    • RDD
    • DataFrames
    • DataSet
  • 相同点
    • 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
    • 三者都是惰性机制,只有遇到Action算子时才会提交作业
    • 有许多共同的函数,如map、filter、sort等。
  • 不同点
    • RDD
      • 不支持sparkSql操作
      • 机器间通信,IO操作均需要序列化和反序列化,性能开销比较大
    • DataFrames
      • 有scheme的RDD,比RDD增加了数据的描述信息
      • 支持sparkSql操作
      • 序列化和反序列化的时候做了结构化优化,减少了不必要的结构化信息,提高了效率
      • 这个是只有固定类型(ROW)的DataSet
    • DataSet
      • 强类型的DataFrames
      • 序列化与反序列化的时候,引入了Encoder机制,达到了按需序列化和反序列化,不必像之前那样整个对象操作了,进一步提高了效率
    • 每个数据类型的应用场景
      • RDD
        • 数据非结构化,如流媒体等
        • 对数据集进行底层的转换、处理、控制
        • 不需要列式处理,而是通过常规的对象.属性来使用数据。
      • DataFrames(必须使用)
        • R或是python语言开发者,使用DF
      • DataSet()必须使用)
        • 在编译时就有高度的类型安全,想要有类型的JVM对象,用上Catalyst优化,并得益于Tungsten生成的高效代码
    • 使用DF、DS场景
      • 需要丰富的语义、高级抽象和特定领域专用的API时
      • 处理需要对半结构化数据进行高级处理,如filter、map、aggregation、average、sum、SQL查询、列式访问或使用lambda函数
      • 在不同的Spark库之间使用一致和简化的API
         
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/711440
推荐阅读
相关标签
  

闽ICP备14008679号