当前位置:   article > 正文

Spark|Spark 的 tranformer 算子和 action 算子整理_tranformer的核心算子

tranformer的核心算子

Transformer 算子

Spark 官方文档地址:https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

map(func)

通过将源数据集的每个元素传递给一个函数 func,返回一个新的 RDD。

filter(func)

通过选择在函数 func 返回 True 的源数据集元素,返回一个新的 RDD。

flatMap(func)

map 类似,但每个输入项可以映射到 0 个或多个输出项(因此 func 应该返回一个序列而不是单个项)。

mapPartitions(func)

类似于 map,但在 RDD 的每个分区上单独运行;当在类型为 T 的 RDD 上运行时,func 的输入为 Iterator<T>,返回值为 Iterator<U>

mapPartitionsWithIndex(func)

类似于 mapPartitions,但还向 func 提供一个表示分区索引的整数值;当在类型为 T 的 RDD 上运行时,func 的输入为索引 intIterator<T>,返回值为 Iterator<U>

sample(withReplacement, fraction, seed)

使用给定的随机种子,在数据中以比例 fraction 进行有放回或无放回抽样,并返回抽样后新的 RDD。

  • withReplacement:是否进行放回抽样
  • fraction:抽样比例
  • seed:随机种子
union(otherDataset)

返回一个新的 RDD,其中包含源数据集和参数中元素的并集。

intersection(otherDataset)

返回一个新的 RDD,其中包含源数据集和参数中元素的交集。

distinct([numPartitions]))

返回一个新的 RDD,其中包含源数据集的不重复元素。

  • numPartitions(可选参数):生成的 RDD 的分区数。如果未提供此参数,则使用原始 RDD 的分区数。
groupByKey([numPartitions])

当应用于 (K, V) 对的数据集时,调用 groupByKey([numPartitions]) 函数将返回一个包含 (K, Iterable<V>) 对的数据集。

如果需要对每个键执行聚合(例如求和或平均值),那么使用 reduceByKeyaggregateByKey 将获得更好的性能。

默认情况下,输出的并行级别取决于父 RDD 的分区数。您可以传递一个可选的 numPartitions 参数来设置不同的任务数。

  • numPartitions(可选参数):生成的 RDD 的分区数。如果未提供此参数,则使用原始 RDD 的分区数。
reduceByKey(func, [numPartitions])

当应用于 (K, V) 对的数据集时,调用 reduceByKey(func, [numPartitions]) 函数将返回一个包含 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数 func 进行聚合,该函数必须是 (V, V) => V 类型。与 groupByKey 类似,通过可选的第二个参数来配置 reduce 任务的数量。

  • numPartitions(可选参数):生成的 RDD 的分区数。如果未提供此参数,则使用原始 RDD 的分区数。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

当应用于 (K, V) 对的数据集时,调用 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) 函数将返回一个包含 (K, U) 对的数据集,其中每个键的值使用给定的组合函数(combOp)和中性的 “零” 值(zeroValue)进行聚合。这允许使用不同于输入值类型的聚合值类型,同时避免不必要的内存分配。与 groupByKey 类似,第二个可选参数可配置 reduce 任务的数量。

  • zeroValue(zero value):在聚合过程中使用的初始值或 “零” 值。它提供了一个累加器,用于每个键的聚合操作。
  • seqOp(sequence operation function):序列操作函数,它定义了如何将每个值与累加器进行组合。它接收两个参数,第一个参数是累加器的值,第二个参数是要进行聚合的值。
  • combOp(combination operation function):组合操作函数,它定义了如何将来自不同分区的局部聚合结果进行合并。类似于 reduceByKey 中的 reduce 操作。它接收两个参数,分别是两个分区的局部聚合结果。
  • numPartitions(可选参数):生成的 RDD 的分区数。如果未提供此参数,则使用原始 RDD 的分区数。
sortByKey([ascending], [numPartitions])

当应用于 (K, V) 对的数据集(其中 K 实现了 Ordered 接口)时,调用 sortByKey(ascending: Boolean) 函数将返回一个按键以升序或降序排序的 (K, V) 对的数据集,排序方式由布尔类型的 ascending 参数指定。

join(otherDataset, [numPartitions])

当应用于 (K, V)(K, W) 的数据集时,调用 join(otherDataset) 函数时将返回一个包含 (K, (V, W)) 对的数据集,其中每个键的所有元素对都会存在。通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持外连接操作。

cogroup(otherDataset, [numPartitions])

当应用于 (K, V)(K, W) 的数据集时,调用 cogroup(otherDataset, [numPartitions]) 函数将返回一个包含 (K, (Iterable<V>, Iterable<W>)) 的元组的数据集。 这个操作也被称为 groupWith

cartesian(otherDataset)

当在类型为 TU 的数据集上,调用 cartesian(otherDataset) 函数时,将返回一个包含 (T, U) 对的数据集,其中包含所有元素的配对。

pipe(command, [envVars])

使用 pipe(command) 函数可以将 RDD 的每个分区通过一个 shell 命令进行处理,比如一个 Perl 或 bash 脚本。RDD 的元素会被写入到进程的标准输入(stdin),而进程的标准输出(stdout)中的每一行会作为字符串的 RDD 返回。

coalesce(numPartitions)

使用 coalesce(numPartitions) 函数可以将 RDD 的分区数减少为 numPartitions。这在对一个大数据集进行筛选后,希望更高效地执行操作时非常有用。

repartition(numPartitions)

使用 repartition(numPartitions) 函数可以对 RDD 中的数据进行随机重分区,创建更多或更少的分区,并在它们之间实现负载均衡。这将始终通过网络对所有数据进行洗牌。

repartitionAndSortWithinPartitions(partitioner)

使用 repartitionAndSortWithinPartitions(partitioner) 函数可以根据给定的分区器对 RDD 进行重新分区,并在每个结果分区中按键对记录进行排序。这比先调用 repartition 然后在每个分区内进行排序更高效,因为它可以将排序操作下推到洗牌机制中。

Action 算子

Spark 官方文档地址:https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

reduce(func)

使用一个函数 func(接受两个参数并返回一个结果)对数据集的元素进行聚合。该函数应当满足可交换和结合的性质,以便可以正确地在并行计算中进行处理。

collect()

将数据集的所有元素作为数组返回给驱动程序。在筛选或其他返回足够小的数据子集的操作之后,通常会很有用。

count()

返回数据集中的元素数量。

first()

返回数据集的第一个元素(类似于 take(1) 的功能)。

take(n)

返回数据集的前 n 个元素作为一个数组。

takeSample(withReplacement, num, [seed])

返回数据集中 num 个元素的随机样本数组,可以使用 withReplacement 选择是否返回抽样,可使用 seed 选择随机种子。

takeOrdered(n, [ordering])

使用自然顺序或自定义比较器,返回 RDD 的前 n 个元素。

saveAsTextFile(path)

将数据集的元素以文本文件(或一组文本文件)的形式写入到指定目录中,可以是本地文件系统、HDFS 或任何其他受 Hadoop 支持的文件系统。Spark 会调用每个元素的 toString 方法将其转换为文件中的文本行。

saveAsSequenceFile(path)【Java and Scala】

将数据集的元素以 Hadoop SequenceFile 的形式写入到指定路径中,可以是本地文件系统、HDFS 或任何其他受 Hadoop 支持的文件系统。这适用于实现了 Hadoop 的 Writable 接口的键值对 RDD。在 Scala 中,还可以用于隐式转换为 Writable 的类型(Spark 包含基本类型如 Int、Double、String 等的转换)的 RDD。

saveAsObjectFile(path)【Java and Scala】

使用 Java 序列化以简单的格式将数据集的元素写入,然后可以使用 SparkContext.objectFile() 加载。

countByKey()

仅适用于键值对类型 (K,V) 的 RDD。返回一个 HashMap,其中包含每个键的计数作为 (K,Int) 对。

foreach(func)

对数据集的每个元素运行函数 func。通常这样做是为了进行副作用,比如更新累加器或与外部存储系统交互。 注意:在 foreach() 之外修改除累加器之外的变量可能会导致未定义的行为。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/736408
推荐阅读
相关标签
  

闽ICP备14008679号