赞
踩
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
举例:
下面例子中把原RDD中每个元素都乘以2来产生一个新的RDD。
val a = sc.parallelize(1 to 9, 3)
val b = a.map(x => x*2)//x => x*2是一个函数,x是传入参数即RDD的每个元素,x*2是返回值
a.collect
//结果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect
//结果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素
val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)//每个元素扩展
b.collect
/*
结果 Array[Int] = Array( 1,
1, 2,
1, 2, 3,
1, 2, 3, 4)
*/
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val rdd2: RDD[(Int, String)] = rdd1.map(x => (x.length, x))
rdd2.mapValues(str=>{
str.length+"_"+str
}).foreach(println)
输出:
(3,3_dog)
(5,5_tiger)
(4,4_lion)
(3,3_cat)
(7,7_panther)
(6,6_ eagle)
flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
举例:
下述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。
val a = sc.parallelize(List((1,2),(3,4),(5,6))) val b = a.flatMapValues(x=>1 to x) b.collect.foreach(println(_)) /* (1,1) (1,2) (3,1) (3,2) (3,3) (3,4) (5,1) (5,2) (5,3) (5,4) (5,5) (5,6) */
flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:
def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]
举例:
scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。