当前位置:   article > 正文

spark SQL语法 与 DSL语法_sparksqldsl语法

sparksqldsl语法

spark SQL语法 与 DSL语法

无论是hadoop、spark、flink其都具备一些共性的功能,都试图不断完善自己的功能。

包括:离线批处理api,离线批处理sql编写能力、DSL语法,实时处理能力

Hadoop:只有离线批处理api hive:离线批处理sql编写能力,对hadoop进行功能进行完善

Spark:离线批处理api,离线批处理sql编写能力、DSL语法,实时处理能力

Flink:不区分批处理和流处理,统一表编写程序。其只区分了基础底层datastream api,以及高级接口table api和sql 语法

rdd,dataframe,dataset三者都是分布式弹性数据集Resilient

RDD相比DataFrame不支持sql操作,一般与mlib一起使用。DataFrame是指定了列名的,可以通过列名访问。

DataFrame是Dataset的一个特例,其类型为Dataset[Row]。两者都支持sql操作,比如select,groupby。

spark sql 与hive集成

https://blog.csdn.net/Clown_34/article/details/122421267

共用几种方案:

1将hive的配置文件链接到spark的conf文件夹,还有mysql connector、hdfs的配置文件等。使用spark-shell执行sql语法。或在程序中使用spark程序的sparkcontext,借助spark.sql执行sql语句。

2将hive的配置文件链接到spark的conf文件夹,还有mysql connector、hdfs的配置文件等。spark-sql执行sql语句。也可以开启thriftserver,使用beeline直接执行sql语句,和hiveserver2形式一样。

SQL 与 DSL的转换

#一个dataframe或dataset想要执行sql语句,需要创建View表,才能操作。
val df=spark.read.json("data/user.json")
df.createOrReplaceTempView("people")

#一个sql语句的查询结果就是dataframe,之后就可以执行DSL语法的语句。
val sqlDF=spark.sql("SELECT * FROM people")
sqlDF.show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

SQL语法

sql语法风格是指查询数据时使用sql语句来查询,这种风格的查询必须要有临时试图或者全局视图来辅助

#对DataFrame创建一个临时表,这样可以使用sql进行操作
val df=spark.read.json("data/user.json")
df.createOrReplaceTempView("people")
val sqlDF=spark.sql("SELECT * FROM people")
sqlDF.show()

#普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使
用全局临时表时需要全路径访问,如:global_temp.people

#创建全局表
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

DataFrame

创建DataFrame
#从本地文件系统的json文件创建dataframe
val df=spark.read.json("data/user.json")
#从RDD创建dataframe
#从hive table进行查询返回
  • 1
  • 2
  • 3
  • 4
DSL语法

domain-specific language,DSL语法用于管理结构化数据,可以使用scala、java、python等编写DSL语法语句,无需创建临时视图使用sql了。

无需编写符合sql规范的语句,可以灵活的与编程语言粘合。

#等同于sql的select语句,注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
df.select($"username",$"age" + 1).show
df.select('username, 'age + 1).show()
#
df.filter($"age">30).show
df.groupBy("age").count.show
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
DSL语法与sql差异
where 和 filter的差异,where是filter的别名
$取列值是语法糖,本质是返回一个column对象
  • 1
  • 2
RDD与DataFrame互相转换

在IDEA开发程序时,如果需要将RDD于DF和DS之间互相操作,需要import spark.implicits._

这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

val idRDD=sc.textFile("id.txt")
idRDD.toDF("id").show

#DataFrame转RDD
val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, 
t._2)).toDF
val rdd = df.rdd
val array = rdd.collect
array(0)
array(0)(0)
array(0).getAs[String]("name")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
#开发中,常常通过样例类将rdd转换为dataframe,如果是一个样例类,那么可以直接取属性名作为dataframe的列名
case class User(name:String,age:Int)
sc.makeRDD(List(("zhangsan",30),("lisi",40))).map(t=>User(t._1,t._2)).toDF.show
  • 1
  • 2
  • 3
直接调用toDF实际上是借助隐式转换完成的,一般不使用,我们可以通过createDataFrame转换rdd到df或ds
https://blog.csdn.net/sunyiyuan1213/article/details/91450379
#其中spark是创建的sparkSession
val classDF: DataFrame = spark.createDataFrame(usersRow)
val structDf: DataFrame = spark.createDataFrame(structRow,structSchema)
  • 1
  • 2
  • 3
  • 4
  • 5

Dataset

Dataset是具有强类型的数据集合,需要提供对应的类型信息。

创建Dataset
#使用样例类序列创建DataSet
case class Person(name:String,age:Long)
val caseClassDS=Seq(Person("zhangsan",2)).toDS()
caseClassDS.show

#使用基本类型的序列创建DataSet
val ds=Seq(1,2,3,4,5).toDS
ds.show

#通过 SparkSession.createDataset() 直接创建
val spark = SparkSession.builder().config(conf).getOrCreate();
import spark.implicits._;
val ds = spark.createDataset(List(Person("Jason",34,"DBA"),Person("Tom",20,"Dev")));
ds.show();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
Dataset与其他类型之间的转换
#注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
#RDD转DataSet
ds1=sc.makeRDD(List(("zhangsna",30),("lisa",60))).map(t=>User(t._1,t._2)).toDS

#Dataset转RDD
ds1.rdd

#DataFrame和DataSet转换
val df=sc.makeRDD(List(("zhangsan",30))).toDF("name","age")
val ds=df.as[User]
VAL df=ds.toDF
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

三者的共性

1.都是spark平台下的分布式弹性数据集,为处理大型数据提供便利。

2.三者都有惰性机制,在创建、转换,如map方法时,不会立即执行,只有碰到action如foreach时,三者才会开始遍历运算。

3.三者有一些共同的函数,如filter,排序等。

4.在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在 创建好 SparkSession 对象后尽量直接导入)

5.都有partition的概念

Dataframe相比于rdd,多了列名,可以方便进行sql。rdd无法直接查看每一列的值,必须通过解析。

Dataframe时Dataset的特例,相当于指定类型为Row,类型可以为person、teacher等。
Row是无法知道每列字段的具体类型的,所以其是弱类型的,

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/656956
推荐阅读
相关标签
  

闽ICP备14008679号