赞
踩
官网定义:
map(func) :返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。
mapPartitions(func) :与 map 类似,但是单独的运行在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator => Iterator 类型。由于是一次性读取单个分区内的所有数据,所以要注意避免出现OOM的情况
mapPartitionsWithIndex(func) : 与 mapPartitions 类似,但是也需要提供一个代表分区索引的整型值作为参数,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator) => Iterator 类型。
val conf = new SparkConf().setMaster("local[2]").setAppName("RDD") val sc = new SparkContext(conf) val data = sc.parallelize(1 to 5,2) /* 将原rdd中每个数据都+1 */ def getMapPartitions(it:Iterator[Int]) : Iterator[Int]= { var res = for(x <- it) yield x+1 res } /* 打印分区号和对应的数据 */ def getMapPartitionsWithIndex(index:Int,it:Iterator[Int]):Iterator[String] = { var res = it.toList.map(x=> index+" : "+ x) res.toIterator } val res1 = data.map(_*2) val res2 = data.mapPartitions(getMapPartitions) val res3 = data.mapPartitionsWithIndex(getMapPartitionsWithIndex) res1.foreach(x=>print(x+" ")) println("========================================") res2.foreach(x=>print(x+ " ")) println("========================================") res3.foreach(x=>println(x)) sc.stop()
输出结果:
6 2 8 4 10
4 2 5 3 6
0 : 1
0 : 2
1 : 3
1 : 4
1 : 5
注意点:
1.由于这是的测试数据是通过并行化一个集合得到的,所以测试RDD的数据类型是Int型,在定义func getMapPartitions/getMapPartitionsWithIndex的时候,输入参数Iterator也是Int型。
如果我们的测试数据是通过textFile(“XXX”)读取文件得到的,那么我们RDD的数据类型就是String类型,相应的输入参数Iterator也是String类型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。