当前位置:   article > 正文

map、mapPartitins、mapPartitionsWithIndex的区别_map mappartitions mappartitionswithindex

map mappartitions mappartitionswithindex

官网定义:
map(func) :返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。

mapPartitions(func) :与 map 类似,但是单独的运行在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator => Iterator 类型。由于是一次性读取单个分区内的所有数据,所以要注意避免出现OOM的情况

mapPartitionsWithIndex(func) : 与 mapPartitions 类似,但是也需要提供一个代表分区索引的整型值作为参数,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator) => Iterator 类型。

val conf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(conf)
val data = sc.parallelize(1 to 5,2)
/*
将原rdd中每个数据都+1
 */
def getMapPartitions(it:Iterator[Int]) : Iterator[Int]= {
  var res = for(x <- it) yield x+1
  res
}
/*
打印分区号和对应的数据
 */
def getMapPartitionsWithIndex(index:Int,it:Iterator[Int]):Iterator[String] = {
  var res = it.toList.map(x=> index+" : "+ x)
  res.toIterator
}
val res1 = data.map(_*2)
val res2 = data.mapPartitions(getMapPartitions)
val res3 = data.mapPartitionsWithIndex(getMapPartitionsWithIndex)

res1.foreach(x=>print(x+" "))
println("========================================")
res2.foreach(x=>print(x+ " "))
println("========================================")
res3.foreach(x=>println(x))
sc.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

输出结果:
6 2 8 4 10
4 2 5 3 6
0 : 1
0 : 2
1 : 3
1 : 4
1 : 5

注意点:
1.由于这是的测试数据是通过并行化一个集合得到的,所以测试RDD的数据类型是Int型,在定义func getMapPartitions/getMapPartitionsWithIndex的时候,输入参数Iterator也是Int型。
如果我们的测试数据是通过textFile(“XXX”)读取文件得到的,那么我们RDD的数据类型就是String类型,相应的输入参数Iterator也是String类型

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/655861
推荐阅读
相关标签
  

闽ICP备14008679号