赞
踩
目录
RDD 基本概念
RDD是什么 | 为什么需要RDD | RDD特性 |
RDD 是一个可读的可分区的分布式数据集,RDD中保存着数据的转换关系,真正的数据存储在各个分区上。分区的设计可以让RDD中的数据被并行操作。
|
数据处理模型:用户友好型
RDD 的分区设计,使得数据可以被并行处理,提高效率。同时用户体验上,感觉让用户是在操作本地的一个集合。 不需要关心具体实现细节(文件分配在那几台机器上,怎么读取,如何汇总,是否会有机器故障等问题),只关心结果就好。
操作RDD就像操作一个本地集合一样简单。不关心任务调度和容错等问题。 容错处理。 |
1、Spark的核心概念是RDD (resilient distributed dataset(弹性分布式数据集)),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。 2、RDD是一个基本的抽象。RDD中不存真正要计算的数据,而是记录了RDD的转换关系(调用什么方法,传入什么函数)RDD是被分区的,真正的数据集存在各个分区(分区有编号)上,每个分区分布在集群中的不同Worker节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)通过对RDD执行操作,其实是对RDD的各个分区中的数据同时并行执行了操作,操作RDD就像操作一个本地集合一样,降低了编程的难度。 ps:同一个stage中,RDD中的一个分区对应一个task,一个分区对应的task只能运行在一台机器上(executor),但是一台机器上可以有多个分区对应的Task. 一个executor有几个线程,就可以同时支持几个task, 而线程的数量取决于executor资源的多少。 3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过RDD的本地创建转换而来。 4、传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法。 RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。 RDD的lineage特性。 5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性) 注意:RDD不是真正的存储数据的单元,RDD只是一个抽象的概念,数据真正存在在RDD对应的partition分区中 |
提示:以下是本篇文章正文内容,下面案例可供参考
产生新的RDD
普通RDD + pair RDD
函数名 | 含义 | 输入 | 示例 | 结果 |
---|---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD,包含重复数据 | rdd={1,2,3}, other={3,4,5} scala中正式写法: rdd = sc.parallelize(List(1,2,3)) other = sc.parallelize(List(3,4,5)) |
rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求两个RDD共同的元素RDD, 自动去重(需要shuffle, 性能较union差) |
rdd={1,2,3}, other={3,4,5} | rdd.intersection(other) | {3} |
subtract() | 移除一个元素的内容(需要shuffle) | rdd={1,2,3}, other={3,4,5} | rdd.subtract(other) | {1,2} |
cartesian() | 与另一个RDD的笛卡儿积, 选取的是两个RDD的元素的所有组合,开销巨大 |
rdd={1,2,3}, other={3,4,5} | rdd.cartesian(other) | {(1,3),(1,4)...(3,5)} |
map() | 函数应用于RDD中的每个元素 | rdd={1,2,3,3} | rdd.map(x=>x+1) | {2,3,4,4} |
flatMap()和map()的区别: map会将一个长度为N的RDD转换为另一个长度为N的RDD; flatMap会将一个长度为N的RDD转换成一个N个元素的集合,然后再把这N个元素合成到一个单个RDD的结果集。 |
rdd={"zwm hyh fyr", "zwm hyh zll lw"} |
rdd.map(line => line.split(" ")) | {Array("zwm","hyh","fyr"), Array("zwm","hyh","zll","lw")}
|
|
flatMap() | 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD | rdd={"zwm hyh fyr", "zwm hyh zll lw"} |
rdd.flatMap(line => line.split(" ")) | {"zwm","hyh","fyr","zwm","hyh","zll","lw"} |
filter() | 返回一个通过传给filter()的函数的元素组成的RDD | rdd={1,2,3,3} | rdd.filter(x=>x!=1) | {2,3,3} |
distinct() | 去重( 需要经过数据混洗shuffle, 开销大) | rdd={1,2,3,3} | rdd.distinct() | {1,2,3} |
sample(withReplacement,fraction,[seed]) | 对RDD进行采样,以及是否替换 | rdd.sample(false,0.5) | 非确定 |
以下Transformation操作只作用于Pair RDD {(key1,value1),(key 2,value 2),(key 3,value 3)} 都作用于相同key的value上
函数名 |
含义 |
输入 |
示例 |
结果 |
---|---|---|---|---|
mapValues(f) | 对键值对中的每个value应用一个函数,但不改变键key, 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素 |
rdd={(1,2),(3,4),(3,6)} | rdd.mapValues(x => x+1) | { (1,3) , (3,5) , (3,7) } |
flatMapValues(f) | 每个一元素的Value被f映射为一系列的值 | rdd={(1,2),(3,4),(3,6)} | rdd.flatMapValues(x => ( x to 5 )) 对于每个元素中的value, 都执行从value到5间的所有元素 比如对于元素(1,2), 执行结果就是(1,2), (1,3), (1,4), (1,5) |
{ (1, 2) , (1, 3) , (1, 4) , (1, 5) , (3, 4) , (3, 5) } |
reduceByKey(f) 作用于具有相同Key 的value上 |
合并key相同的值(value),返回一个由各个key以及各key归约出的结果值的新RDD (key1, result1), (key2, result2), …(keyN, resultN) |
rdd={(1,2),(1,7),(1,3),(3,4),(3,6)} | rdd.reduceByKey( ( x,y) => x+y ) 将具有相同key的value相加 此处的x,y 是具有相同key的value,如果key = 1, 则x=2, y=7, z=3, ruduceByKey执行的是把xyz相加得12,若key =3, 则x=4, y=6,ruduceByKey执行的是把xy相加得10 |
{ (1,12) , (3,10) } |
groupBy() | 对集合中的元素进行分组操作,结果是得到 (key1, (val1, val2)), (key2, (val1, val2)) | rdd= {(java,2), (java,3),(python,3),(python,3),(python,4)} | rdd.groupBy(_._1) 将具有相同key的value变成一个迭代器 |
{ (java, (2, 3)), |
groupByKey() | 对具有相同key的值(value)分组 | rdd={(jave,2), (bigdata,4), (bigdata,6)} | rdd.groupByKey() | { (java,2) , (bigdata, [4,6] ) } |
combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner) createCombiner:分区内 创建组合函数 mergeValue:分区内 合并值函数 mergeCombiners:多分区 合并组合器函数 partitioner:自定义分区数,默认为HashPartitioner mapSideCombine:是否在map端进行Combine操作,默认为true |
使用不同的返回类型合并具有相同键的值 combineByKey会遍历分区中的所有元素,因此每个元素的key要么没遇到过,要么和之前某个元素的key相同。 如果这是一个新的元素,函数会调用createCombiner创建那个key对应的累加器初始值。 如果这是一个在处理当前分区之前已经遇到的key,会调用mergeCombiners把该key累加器对应的当前value与这个新的value合并。 |
rdd={("男", "李四"), ("男", "张三"), ("女", "韩梅梅"), ("女", "李思思"), ("男", "马云")} |
rdd.combineByKey( (x: String) => (List(x), 1), // 元素中的key是首次遇到时,对该key的vaule执行createCombiner (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //元素中key非首次遇到,对value执行mergeValue,累加value (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners 合并分区时用 这里面的x,peo,sex1全部都是value |
(男, ( List( 张三, 李四, 马云),3 ) ) (女, ( List( 李思思, 韩梅梅),2 ) ) 注: elem::List 表示把elem加到List头部 List1:::List2 表示连接两个List |
val rdd08 = sc.parallelize (List((1, 1), (1, 4), (1, 3), (3, 7), (3, 5))) |
val result = rdd08.combineByKey( (v) => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).map{ case (key, value) => (key, value._1 / value._2.toFloat) } result.collectAsMap().map(println(_)) |
(1,2.66667), (3,6) | ||
keys() | 获取所有key | rdd={(1,2),(3,4),(3,6)} | rdd.keys | {1,3,3} |
values() | 获取所有value | rdd={(1,2),(3,4),(3,6)} | rdd.values | {2,4,6} |
sortByKey() | 根据key排序 | rdd={(1,2),(3,4),(3,6)} | rdd.sortByKey() | { (1,2) , (3,4) , (3,6) } |
subtractByKey | 删掉rdd1中与rdd2的key相同的元素 | rdd1={(1,2),(3,4),(3 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。