当前位置:   article > 正文

大数据技术之SparkSQL_spark sql的功能

spark sql的功能

标题:Spark SQL: 大数据处理的强大工具
作者:[深海的鱼]
日期:2023/9/7



前言

在大数据时代,数据的处理和分析变得越来越重要。Apache Spark是一个强大的开源大数据处理框架,而Spark SQL是其组件之一,用于处理结构化数据。本文将介绍Spark SQL的基本概念、功能和用途,以及如何使用它来处理大规模数据


一、什么是Spark SQL?

Spark SQL是Spark用来处理`(ETL)数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

1.Spark SQL的特性
1.1.1 易整合

1.1.2	统一的数据访问方式
1.1.3 	兼容 Hive
1.1.4 	标准的数据连接 SparkSQL 可以看做是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式
  • 1
  • 2
  • 3

2.Spark SQL的功能
2.2.1 结构化数据处理
Spark SQL支持读取和处理多种结构化数据格式,包括Parquet、JSON、CSV等。它能够将这些数据加载到DataFrame中,使用户能够以表格形式查询和分析数据

2.2.2 SQL查询
Spark SQL允许用户使用标准的SQL查询语言来查询数据。这使得那些熟悉SQL的开发人员能够轻松地进行数据分析

2.2.3 分布式计算
与Spark的其他组件一样,Spark SQL也能够利用分布式计算能力,处理大规模数据。它可以在集群上运行,以加速数据处理任务

2.2.4 集成性
Spark SQL可以与其他Spark组件(如Spark Streaming、MLlib等)集成,从而为数据处理提供了更多的可能性。用户可以将结构化数据处理与机器学习、流处理等任务结合使用

二、RDD vs DataFrames vs DataSet

1.1.1 RDD

RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。
RDD的最大好处就是简单,API的人性化程度很高。
RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。

RDD 例子如

<scala> val rdd1 = sc.textFile("file:///people.txt")
  • 1

1.1.2 Dataframe

var rdd = sc.textFile("")
var dataframe = spark.read.json("***.json")
dataframe.rdd;
import spark.implicits._
rdd.toDF:
  • 1
  • 2
  • 3
  • 4
  • 5

DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待
DataFrame也是懒执行的。
性能上比RDD要高,主要有两方面原因:
定制化内存管理
数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。

优化的执行计划
Json 连接两个文件。Filter(“过滤”) RDD: 两个大文件。 再过滤。比较消耗内存。
查询计划通过Spark catalyst optimiser进行优化.

有一个需求:两个大表.两个超大数据文件.
两个表关联查询,其中一个表中,要通过条件,过滤掉一部分数据.

rdd1.join(rdd2).filter(rdd2)

比如下面一个例子:

users.join(events, users("id") === events("uid")	
		.filter(events("date") > "2015-01-01")
  • 1
  • 2

对于普通开发者而言,查询优化器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。
Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错.

1.1.3Dataset

1)是Dataframe API的一个扩展,是Spark最新的数据抽象
2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
当我需要查询某个对象的某个字段时,不需要加载整个对象(文件)信息。只加载某一列。
4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

<schla> val ds = spark.sqlContext.read.json("file:///")
<schla> ds.show
<schla>ds.filter(_.age <21).show
  • 1
  • 2
  • 3

RDD让我们能够决定怎么做,而DataFrame和DataSet让我们决定做什么,控制的粒度不一样。

1.1.4 三者的共性

1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map不运行
rdd.map{line=>
  println("运行")
  line._1
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
4、三者都有partition的概念
5、三者有许多共同的函数,如filter,排序等
6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持
import spark.implicits._
7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

DataFrame:

testDF.map{
      case Row(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
    testDS.map{
      case Coltest(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.1.4 三者的区别
RDD:
1、RDD一般和spark mlib同时使用
2、RDD不支持sparksql操作
DataFrame:
1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}
  • 1
  • 2
  • 3
  • 4
  • 5

每一列的值没法直接访问
2、DataFrame与Dataset一般不与spark ml同时使用
3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如

dataDF.createOrReplaceTempView("tmp")
spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
  • 1
  • 2

4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

//保存

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//读取
val options = Map("header" ->
  • 1
  • 2
  • 3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/785620
推荐阅读
相关标签
  

闽ICP备14008679号