当前位置:   article > 正文

Spark RDD 操作 Transformation/Action 以及示例_spark rdd action transformation

spark rdd action transformation

目录

前言

一、Transformation

Pair RDD

二、Action 操作

 

Pair RDD

三、WordCount

统计每个学科中最受欢迎的老师

分组统计

多次过滤

自定义分区器

自定义分区器

总结


 

前言

RDD 基本概念

RDD是什么 为什么需要RDD RDD特性

RDD 是一个可读的可分区的分布式数据集,RDD中保存着数据的转换关系,真正的数据存储在各个分区上。分区的设计可以让RDD中的数据被并行操作。

  • Resilient: 通过使用RDD谱系图(DAG)实现容错。因此,当节点发生故障时,可以进行重新计算。
  • Distributed:  Spark RDD的数据集驻留在多个节点中。
  • Dataset: 您将使用的数据记录。

数据处理模型:用户友好型
rdd 提供了类似Scala 的算子,相比于mapReduce中的两种算子:map/reduce。rdd 提供的数据转换的算子更丰富,功能更强大。

 

RDD 的分区设计,使得数据可以被并行处理,提高效率。同时用户体验上,感觉让用户是在操作本地的一个集合。

不需要关心具体实现细节(文件分配在那几台机器上,怎么读取,如何汇总,是否会有机器故障等问题),只关心结果就好。

 

操作RDD就像操作一个本地集合一样简单。不关心任务调度和容错等问题。

容错处理。

1、Spark的核心概念是RDD (resilient distributed dataset(弹性分布式数据集)),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

2、RDD是一个基本的抽象。RDD中不存真正要计算的数据,而是记录了RDD的转换关系(调用什么方法,传入什么函数)RDD是被分区的,真正的数据集存在各个分区(分区有编号)上,每个分区分布在集群中的不同Worker节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)通过对RDD执行操作,其实是对RDD的各个分区中的数据同时并行执行了操作,操作RDD就像操作一个本地集合一样,降低了编程的难度。

ps:同一个stage中,RDD中的一个分区对应一个task,一个分区对应的task只能运行在一台机器上(executor),但是一台机器上可以有多个分区对应的Task. 一个executor有几个线程,就可以同时支持几个task, 而线程的数量取决于executor资源的多少。

3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过RDD的本地创建转换而来。

4、传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法。

RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

RDD的lineage特性。

5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

注意:RDD不是真正的存储数据的单元,RDD只是一个抽象的概念,数据真正存在在RDD对应的partition分区中


提示:以下是本篇文章正文内容,下面案例可供参考

一、Transformation

产生新的RDD

普通RDD + pair RDD

函数名 含义 输入 示例 结果
union() 生成一个包含两个RDD中所有元素的RDD,包含重复数据

rdd={1,2,3}, other={3,4,5}

scala中正式写法:

rdd = sc.parallelize(List(1,2,3))

other = sc.parallelize(List(3,4,5))

rdd.union(other) {1,2,3,3,4,5}
intersection() 求两个RDD共同的元素RDD,
自动去重(需要shuffle, 性能较union差)
rdd={1,2,3}, other={3,4,5} rdd.intersection(other) {3}
subtract() 移除一个元素的内容(需要shuffle) rdd={1,2,3}, other={3,4,5} rdd.subtract(other) {1,2}
cartesian() 与另一个RDD的笛卡儿积,
选取的是两个RDD的元素的所有组合,开销巨大
rdd={1,2,3}, other={3,4,5} rdd.cartesian(other) {(1,3),(1,4)...(3,5)}
map()  函数应用于RDD中的每个元素  rdd={1,2,3,3} rdd.map(x=>x+1) {2,3,4,4}

flatMap()和map()的区别:

map会将一个长度为N的RDD转换为另一个长度为N的RDD;

flatMap会将一个长度为N的RDD转换成一个N个元素的集合,然后再把这N个元素合成到一个单个RDD的结果集。

rdd={"zwm hyh fyr",
"zwm hyh zll lw"}
rdd.map(line => line.split(" "))

{Array("zwm","hyh","fyr"),

Array("zwm","hyh","zll","lw")}

 

flatMap() 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD rdd={"zwm hyh fyr",
"zwm hyh zll lw"}
rdd.flatMap(line => line.split(" "))

{"zwm","hyh","fyr","zwm","hyh","zll","lw"}

filter() 返回一个通过传给filter()的函数的元素组成的RDD rdd={1,2,3,3} rdd.filter(x=>x!=1) {2,3,3}
distinct() 去重( 需要经过数据混洗shuffle, 开销大) rdd={1,2,3,3} rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 对RDD进行采样,以及是否替换   rdd.sample(false,0.5) 非确定

Pair RDD

以下Transformation操作只作用于Pair RDD {(key1,value1),(key 2,value 2),(key 3,value 3)} 都作用于相同key的value上

函数名

含义

输入

示例

结果

mapValues(f) 对键值对中的每个value应用一个函数,但不改变键key,
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素
rdd={(1,2),(3,4),(3,6)} rdd.mapValues(x => x+1) { (1,3) , (3,5) , (3,7) }
flatMapValues(f) 每个一元素的Value被f映射为一系列的值 rdd={(1,2),(3,4),(3,6)} rdd.flatMapValues(x => ( x to 5 ))
对于每个元素中的value, 都执行从value到5间的所有元素
比如对于元素(1,2), 执行结果就是(1,2), (1,3), (1,4), (1,5)
{ (1, 2) ,  (1, 3) ,   (1, 4) , (1, 5) ,  (3, 4) , (3, 5) }
reduceByKey(f)
作用于具有相同Key 的value上
合并key相同的值(value),返回一个由各个key以及各key归约出的结果值的新RDD
(key1, result1), (key2, result2), …(keyN, resultN)
rdd={(1,2),(1,7),(1,3),(3,4),(3,6)} rdd.reduceByKey( ( x,y) => x+y )
将具有相同key的value相加
此处的x,y 是具有相同key的value,如果key = 1, 则x=2, y=7, z=3, ruduceByKey执行的是把xyz相加得12,若key =3, 则x=4, y=6,ruduceByKey执行的是把xy相加得10
{ (1,12) , (3,10) }
groupBy() 对集合中的元素进行分组操作,结果是得到 (key1, (val1, val2)), (key2, (val1, val2)) rdd= {(java,2), (java,3),(python,3),(python,3),(python,4)}

rdd.groupBy(_._1)

将具有相同key的value变成一个迭代器

{ (java, (2, 3)),
(python, (3,3,4))}

groupByKey() 对具有相同key的值(value)分组 rdd={(jave,2), (bigdata,4), (bigdata,6)} rdd.groupByKey() { (java,2) , (bigdata, [4,6] ) }
combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)
createCombiner:分区内 创建组合函数
mergeValue:分区内 合并值函数
mergeCombiners:多分区 合并组合器函数
partitioner:自定义分区数,默认为HashPartitioner
mapSideCombine:是否在map端进行Combine操作,默认为true
使用不同的返回类型合并具有相同键的值
combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。
如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值。
如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并。
rdd={("男", "李四"), ("男", "张三"),
("女", "韩梅梅"), ("女", "李思思"), ("男", "马云")}
rdd.combineByKey(
     (x: String) => (List(x), 1),  // 元素中的key是首次遇到时,对该key的vaule执行createCombiner
     (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //元素中key非首次遇到,对value执行mergeValue,累加value
     (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners 合并分区时用
这里面的x,peo,sex1全部都是value
(男, ( List( 张三,  李四,  马云),3 ) )
(女, ( List( 李思思,  韩梅梅),2 ) )
注: elem::List 表示把elem加到List头部
List1:::List2 表示连接两个List
    val rdd08 = sc.parallelize
(List((1, 1), (1, 4), (1, 3), (3, 7), (3, 5)))
val result = rdd08.combineByKey(
    (v) => (v, 1),
    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
    result.collectAsMap().map(println(_))
(1,2.66667), (3,6)
keys() 获取所有key rdd={(1,2),(3,4),(3,6)} rdd.keys {1,3,3}
values() 获取所有value rdd={(1,2),(3,4),(3,6)} rdd.values {2,4,6}
sortByKey() 根据key排序 rdd={(1,2),(3,4),(3,6)} rdd.sortByKey() { (1,2) , (3,4) , (3,6) }
subtractByKey 删掉rdd1中与rdd2的key相同的元素 rdd1={(1,2),(3,4),(3
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/736398
推荐阅读
相关标签
  

闽ICP备14008679号