当前位置:   article > 正文

Spark Sql中的Map和flatMap_spark flatmap

spark flatmap

和她在一起的每一天都很快乐

        map() 将一个函数应用于DataFrame和DataSet中的每一行并返回新的转换后的DataSet。并不会返回DataFrame,返回的是DataSet[类型].

        flatMap()在对每个元素应用函数之后,flatMap会将数据转换成数据帧/数据集展平,并且返回一个新的数据集。

关键点

        1.map()和flatMap()返回的都是DataSet(DataFrame=DataSet[Row])

        2.flatMap在某些列上可能会产生冗余的数据

        3.map返回的是与输入DtaFrame中相同的记录,flatMap为每个记录返回许多记录,一对多。

使用Map进行转换

        先看一下dataFrame数据集。

        通过创建一个数据集,然后创建一个schema映射。通过schema映射成字段名。

 

        使用Map进行操作,将数据根据空格分隔开。

  1. val value = dataFrame.map(fun => {
  2. val strings = fun.getString(0).split(" ")
  3. strings })
  4. value.show()

        因为上面的dataFrame中,只有一列,就是data列

        所以下面map的时候

        fun.getString(0)就是获取data列的数据

        如果使用SQL语句进行操作,可以使用explode操作进行炸裂、

        首先split通过空格,拆分之后形成数组,形成数组之后进行explode进行炸裂

  select explode(split(data," ")) from table

        经过map之后,Projext Gutenberg's ===》 projext Gutenberg's

使用FlatMap转换

        对每个元素应用函数之后,Spark FlatMap将会展平DataFrame列。并分别返回一个新的DataFrame。

        返回后的DataFrame可能具有和之前DataFrame数据量一样的元素,也有可能比之前的元素更多,因为炸裂了。

        下面会通过两个案例介绍flatMap,第一个案例比较简单,第二个案例稍微有一些难度,需要大家动一动脑子。

  1. val value = dataFrame.flatMap(fun => {
  2. val strings = fun.getString(0).split(" ")
  3. strings })
  4. value.show()

 

        不同的是,map案例中,split函数之后,变成了数组。在flatMap案例中,split函数之后,从数组转换为一行。将数组里面的元素分成一个个元素。

flatMap案例2

        创建数据集并且创建StructType映射字段名

  1. val arrayStructureData = Seq(
  2. Row("James,,Smith",List("Java","Scala","C++"),"CA"), Row("Michael,Rose,",List("Spark","Java","C++"),"NJ"), Row("Robert,,Williams",List("CSharp","VB","R"),"NV") )
  3. val arrayStructureSchema = new StructType()
  4. .add("name",StringType)
  5. .add("languagesAtSchool", ArrayType(StringType))
  6. .add("currentState", StringType)

        这个案例的目的是什么呢?

        首先看一下这个dataFrame的schema

 

        本次案例的目的就是将languagesAtSchool这个字段炸开

        James,,Smith [Java,Scala,C++] CA

        变成下面的模样

        James,Smith Java CA

        James,Smith Scala CA

        JAmes,Smith C++ CA

df.flatMap(f => { val lang = f.getSeq[String](1) lang.map(f.getString(0),_,f.getString(2)))})

        但看上面的操作,可能看不懂。

        getSeq是什么意思,(1)又是什么?        

 

        打印一下lang,返回的是WrappedArray类型的对象。

        因此可以猜想一下,getSeq[String](1)就是获取字符串类型的Seq。就是[Java,Scala,C++]

    lang.map(f.getString(0),_,f.getString(2))

        lang.map(f.getString(0),_,f.getString(2)) =>>

        

        可以看到,[Java,Scala,C++]被拆分了,这就是flatMap的魅力,将数组书中的元素转换成单个的元素。

        同时也可以看到,造成了数据冗余。这波操作就像explode。

        准确一点像是explode + lateral view

  1. select Name,t as language,State
  2. from table
  3. lateral view
  4. explode(split(languages," ,")) temp as t

DataFrame和Sql上的ArrayType

        在处理结构化的文件时,可能会遇到ArrayType类型的数组,或者是其它类型的数组,例如嵌套数组。

        创建数据集并且创建schema映射字段

  1. val seq = Seq(
  2. Row("James,,Smith", List("Java", "Scala", "C++"), List("Spark", "Java"), "OH", "CA"), Row("Michael,Rose,", List("Spark", "Java", "C++"), List("Spark", "Java"), "NY", "NJ"), Row("Robert,,Williams", List("CSharp", "VB"), List("Spark", "Python"), "UT", "NV") )
  3. val structType = new StructType()
  4. .add("name", StringType)
  5. .add("languagesAtSchool", ArrayType(StringType))
  6. .add("languagesAtWork", ArrayType(StringType))
  7. .add("currentState", StringType)
  8. .add("previousState", StringType)

        数据集中,有数组字段ArrayType。在映射字段的时候也使用了ArrayType(StringType)。

        对dataFrame进行输出

 

 

explode

        使用explode函数给数组中的列进行炸开。

dataFrame.select($"name",explode($"languagesAtSchool")).show(false)

 

Split

        对数组进行拆分,和SQL一样,第一个参数是列,第二个参数是分隔符

dataFrame.select(functions.split($"name",",").as("nameAsArray")).show(false)

 

array

        可以将多个列合并成数组

dataFrame.select($"name",array($"currentState",$"previousState").as("Sates")).show(false)

 

总结

        越来越依赖她了,就像一个小太阳一样,一看到她,就会感觉到自己幸福着。让我流连忘返

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/785619
推荐阅读
相关标签
  

闽ICP备14008679号