赞
踩
spark的算子可以分为trans action算子 以及 action算子 ,即变换/转换 算子。如果执行一个RDD算子并不触发作业的提交,仅仅只是记录作业中间处理过程,那么这就是trans action算子 ,相反如果执行这个 RDD 时会触发 Spark Context 提交 Job 作业,那么它就是 action算子及行动算子。
总结来说就是在Spark中,转换算子并不会马上进行运算的,即所谓的“惰性运算”,而是在遇到行动算子时才会执行相应的语句的,触发Spark的任务调度并开始进行计算。
我们可以将行动算子分为两类:
一、数据运算类: 1、reduce 将rdd中的数据进行聚合,先进行分区内聚合,在进行分区间聚合 2、collect 将rdd中的数据按分区号采集,并以数组的形式返回所有数据 3、collectAsMap 收集Key/Value型RDD中的元素,并以map的形式返回数据 4、foreach 循环遍历分区内数据,该算子执行位置是在Executor端 5、count 计算rdd中数据个数 6、first 取rdd中数据的第一个 7、take 取rdd中数据的前num个 8、takeOrdered 将rdd中的数据进行排序后取前num个 9、aggregate 类似于aggregateByKey算子,同样两个参数列表,分别传递初始值和分区内计算规则和分区间计算规则。 10、fold 简化版的aggregate,分区内计算规则和分区间计算规则一样。 11、countByKey 根据键值对中的key进行计数,返回一个map,对应了每个key在rdd中出现的次数。 12、countByValue 根据rdd中数据的数据值进行计数,注不是键值对中的value,同样返回一个map,对应每个数据出现的次数。 13、max 求rdd中数据的最大值 14、min 求rdd中数据的最小值 二、数据存储类: 1、saveAsTextFile 存储为文本文件 2、saveAsObjectFile 存储为二进制文件 3、saveAsSequenceFile 要求数据必须为<k,v>类型, 保存为 Sequencefile文件
注:sequenceFile文件是Hadoop用来存储二进制形式的 (Key,Value) 对而设计的一种平面文件。详细可以看这篇文章了解:链接
通过传入的方法聚集rdd中所有的元素,先聚合分区内的数据,再聚合分区间的数据
def reduce(f: (T, T) => T): T
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val count: Int = rdd.reduce((_: Int) + (_: Int))
数据采集,以数组Array的形式按分区顺序返回数据集中的所有元素
def collect(): Array[T]
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val ints: Array[Int] = rdd.collect()
println(ints.mkString(","))
收集Key/Value型RDD中的元素,并以map的形式返回数据
注:只有key/value类型的RDD才有这个方法
def collectAsMap(): Map[K, V]
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val map: collection.Map[String, Int] = rdd2.collectAsMap()
println(map.mkString(","))
循环遍历分区内数据,该算子执行位置是在Executor端
def foreach(f: T => Unit): Unit
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
rdd.collect().foreach(print)
println()
println("********************")
rdd.foreach(print)
返回rdd中元素的个数,即collect返回的数组的长度
def count(): Long
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val count: Long = rdd.count()
println(count)
返回rdd中的第一个元素,即collect返回的数组的第一个元素
def first(): T
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val first: Int = rdd.first()
println(first)
返回rdd中的前n个元素,即collect返回的数组的前n个元素
def take(num: Int): Array[T]
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
返回rdd中排序后的前n个元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
val rdd: RDD[Int] = sc.makeRDD(List(4, 2, 1, 3))
val ints: Array[Int] = rdd.takeOrdered(3)
println(ints.mkString(","))
与aggregateByKey类似,需要传入两个参数列表,列表元素意义也相同
aggregateByKey:初始值只会参与分区内计算
aggregate:初始值既会参与分区内计算也会参与分区间计算
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Int = rdd.aggregate(10)((_: Int) + (_: Int), (_: Int) + (_: Int))
// [1, 2] => 10 + 1 + 2 => 13
// [3, 4] => 10 + 3 + 4 => 17
// [13, 14] => 10 + 13 + 17 = 40
println(res)
类似于foldByKey,即当aggregate的分区内和分区间计算规则相同时可以简化使用fold,只需要传入一个计算规则
def fold(zeroValue: T)(op: (T, T) => T): T
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val res: Int = rdd.fold(10)((_: Int) + (_: Int))
//[1, 2] => 10 + 1 + 2 => 13
//[3, 4] => 10 + 3 + 4 => 17
//[13, 14] => 10 + 13 + 17 = 40
println(res)
用于统计键值对类型的数据中每个key出现的个数
def countByKey(): Map[K, Long]
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val res: collection.Map[String, Long] = rdd.countByKey()
println(res)
根据rdd中数据的数据值进行计数,注不是键值对中的value,同样返回一个map,对应每个数据出现的次数。
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
val res1: collection.Map[Int, Long] = rdd.countByValue()
val res2: collection.Map[(String, Int), Long] = rdd2.countByValue()
println(res1)
println(res2)
返回rdd数据集中的最大值/最小值
def max()(implicit ord: Ordering[T]): T = withScope {
this.reduce(ord.max)
}
def min()(implicit ord: Ordering[T]): T = withScope {
this.reduce(ord.min)
}
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1)))
println(rdd.max())
println(rdd2.max())
println(rdd.min())
println(rdd2.min())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。