赞
踩
大家好!我是文艺工科女茶哩,本文篇幅比较长,避免迷路,建议收藏再“食用”。
spark处理数据时,会将一整块数据分割成多个分块数据块,这些分块数据块组成的集合,称为RDD(Resilient Distributed Datasets)。
RDD是一种可扩展的弹性分布式数据集,是Spark最基本的数据抽象,表示一个只读、且分区不变的数据集合,是一种分布式的内存抽象,不具备schema的数据结构,可以基于任何数据结构创建。
RDD在代码中是一个抽象类,本质上是一组分布式JVM不可变对象集合
弹性分布式数据集(RDD,Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。
RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。
transformatons(转换)
transformations操作会在一个已存在的RDD上传创建一个新的RDD,但实际的计算并没有执行,仅仅是记录操作,所有的计算会发生在actions环节。
actions(动作)
actions动作将执行所有的记录的transformations操作并计算结果,将结果返回到driver程序中,或保存在相关存储系统中。
transform算子
所有的transform都是懒加载的,所谓的懒加载就是在转换时不会立刻计算出结果,只记录数据集的转换过程。当程序驱动要返回结果时,才会开始计算。采用这种策略,使得Spark的运行更加高效。
都有哪些转换动作?
map:类似于python中的map函数,该方法作用于RDD的每个分区的每个元素上
rdd = sc.parallelize(['a','b','c'])
rdd.map(lambda x:(x,1)).collect()
#输出:[('a',1),('b',1),('c',1)]
flatMap:首先应用map函数作用于RDD元素上,然后将结果平坦化,最后返回新的RDD
rdd = sc.parallelize([2,3,4])
result = rdd.flatMap(lambda x:range(1,x))
print(result.collect())
#输出:[1,1,1,2,1,2,3]
filter:筛选出满足条件的元素,以列表形式返回
rdd = sc.parallelize(['abcx','abnn','dfg'])
rdd.filter(lambda x :'ab' in x).collect()
#输出:['abcx','abnn']
union:求得并集
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([5,6,7,8])
rdd1.union(rdd2).collect()
#输出:[1,2,3,4,5,6,7,8]
intersection:求得交集
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])
rdd1.intersection(rdd2).collect()
#输出:[3,4]
distinct:去重
rdd = sc.parallelize([1,2,3,4,3,4])
rdd.distinct().collect()
#输出:[1,2,3,4]
sortBy:根据指定Key的方法对RDD内部的元素进行排序
rdd = sc.parallelize(('a',1),('b',2),('1',4))
rdd.sortBy(lambda x:x[0],False).collect()
#输出:[('1',4),('a',1),('b',2)]
mapPartitions:作用于每个分区
action算子有哪些动作?
reduce:
collect:返回列表
count:返回元素的个数
rdd = sc.parallelize([1,2,3,4,5])
rdd.count()
#5
take:返回的是列表形式
rdd.take(1)
#[1]
first:返回的第一个元素
rdd.fisrt()
#1
top:以列表形式返回前k个元素
rdd.top(3)
#[1,2,3]
saveAsTextFile:以字符串形式存储在文件系统中
foreach
foreachPartition
foreach算子遍历RDD内的每一个元素,也可通过传递函数对RDD内的每个元素进行处理,但是没有返回值。而map算子有返回值
mapValues:仅作用在value上
flatMapValues:先执行mapValues再执行flat
combineByKey
reduceByKey:根据key分组,对元素进行操作
groupByKey:
x%2只有两种情况,即0和1,groupBy(lambda x:x%2)表示将数据分成0,1两组,result包含两部分,一部分是Key值,一部分是resultiterable对象,包含的是rdd中满足条件的值
sortByKey:根据RDD的对内部元素进行排序
keys
values
join
leftOuterJoin
rightOuterJoin
collectAsMap:返回字典
countByKey:根据Key统计元素数量,返回字典
countByValue:根据Value统计元素数量,返回字典
RDD的血统通过记录RDD的元数据信息和转换行为,当RDD的部分分区数据发生丢失时,可以根据这些信息重新运算和恢复丢失的数据分区。
宽依赖:指一个父RDD的partition能被多个子RDD的partition所使用,如groupByKey、reduceByKey、sortByKey等操作,也就是一对多的关系。
窄依赖:指每个父RDD的partition最多只能被一个子RDD的partition所使用,如map、filter、union等操作,也就是一对一的关系。
shuffle:shuffle是spark重新分配数据的一种机制,这些数据可以在不同的区域进行分组。
补充:
当持久化一个RDD时,每个节点的其他分区都可以使用RDD在内存中进行计算,在该数据上的其他action操作将直接使用内存中的数据。这样会让以后的action操作计算加快。
1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD
rdd1 = sc.parallelize(range(1,11))
rdd1_result = rdd1.map(lambda x:x*2)
print(rdd1_result.collect())
#输出:[2,4,6,8,10,12,14,16,18,20]
2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD .
rdd2 = sc.parallelize(range(10,20))
def f2(iterator):
list = []
for i in iterator:
list.append(i*2)
return list
rdd2_result = rdd2.mapPartitions(f2)
print(rdd2_result.collect())
3、创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27…
rdd3 = sc.parallelize(range(1,6))
rdd3_result = rdd3.flatMap(lambda x:(x**2,x**3))
print(rdd3_result.collect())
4、创建一个 4 个分区的 RDD数据为list[10,20,30,40,50,60],使用glom将每个分区的数据放到一个数组。
rdd4 = sc.parallelize([10,20,30,40,50,60],4)
rdd4_result = rdd4.glom()
print(rdd4_result.collect())
#输出:[[10],[20,30],[40],[50,60]]
5、创建一个 RDD数据为list[1, 3, 4, 20, 4, 5, 8],按照元素的奇偶性进行分组。
def f5(x):
if x%2==0:
return "偶数"
else:
return "奇数"
rdd5 = sc.parallelize([1, 3, 4, 20, 4, 5, 8])
rdd5_result = rdd5.groupBy(f5).mapValues(list)
print(rdd5_result.collect())
6、创建一个 RDD(由字符串组成)list[“xiaoli”, “laoli”, “laowang”, “xiaocang”, “xiaojing”, “xiaokong”],
过滤出一个新 RDD(包含“xiao”子串)。
rdd6 = sc.parallelize(["xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"])
rdd6_result = rdd6.filter(lambda x:"xiao" in x)
print(rdd6_result.collect())
7、创建一个 RDD数据为List[10,10,2,5,3,5,3,6,9,1],对 RDD 中元素执行去重操作。
rdd7 = sc.parallelize([10,10,2,5,3,5,3,6,9,1])
rdd7_result = rdd7.distinct()
print(rdd7_result.collect())
8、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算差集,两个都算。
rdd8_1 = sc.parallelize(range(1,7))
rdd8_2 = sc.parallelize(range(4,10))
rdd8_result1 = rdd8_1.subtract(rdd8_2)
rdd8_result2 = rdd8_2.subtract(rdd8_1)
print(rdd8_result1.collect())
print(rdd8_result2.collect())
9、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算交集。
rdd9_1 = sc.parallelize(range(1,7))
rdd9_2 = sc.parallelize(range(4,11))
rdd9_result = rdd9_1.intersection(rdd9_2)
print(rdd9_result.collect())
10、用户表(id,name,age,gender).txt:
001,刘向前,18,0
002,冯 剑,28,1
003,李志杰,38,0
004,郭 鹏,48,1
(1)使用累加器accumulator,计算文件包含学生信息个数。
(2)要求,输出用户信息,gender必须为男或者女,不能为0,1,最终输出格式为
‘’’
rdd10 = sc.textFile("用户表.txt")
count = sc.accumulator(0)
def f10_1(x):
global count
if x != "":
count += 1
rdd10.foreach(f10_1)
print(count)
def f10_2(x):
y = x.split(",")
list = []
for i in y:
if i == "0":
list.append("女")
elif i == "1":
list.append("男")
else:
list.append(i)
return list
def f10_3(x):
print(x[0]+","+x[1]+","+x[2]+","+x[3])
rdd10_result = rdd10.map(f10_2).foreach(f10_3)
补充
数据集格式:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
#读取数据
rdd = sc.textFile("xx.txt")
rdd1 = rdd.map(lambda x:x.split(","))
1、多少名学生
rdd1.map(lambda x:(x[0],1)).groupByKey().count()
2、多少门课程
rdd1.map(lambda x:(x[1],1)).groupByKey().count()
3、Tom总成绩平均分
Tom_score = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).sum()
Tom_num = rdd1.filter(lambda x:(x[0]=='Tom')).map(lambda x:(int(x[2]))).count()
Tom_score/Tom_num
4、每名同学的选修的课程数量
rdd1.map(lambda x:(x[0],1)).groupByKey().map(lambda x:(x[0],sum(x[1]))).collect()
5、该系DataBase课程共有多少人选修
rdd1.filter(lambda x:(x[1]=='DataBase')).count()
6、各门课程平均分是多少
total_score = rdd1.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_num = rdd1.map(lambda x:(x[1],1)).groupByKey().map(lambda x:(x[0],sum(x[1])))
total_score.join(total_num).map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()
#reduceByKey方式
rdd1.map(lambda x:(x[1],(int(x[2]),1))). \
reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])). \
map(lambda x:(x[0],round(x[1][0]/x[1][1],2))).collect()
7、使用累加器计算共有多少人选修DataBase这门课
accum=sc.accumulator(0)
rdd2 = rdd1.filter(lambda x:(x[1]=='DataBase'))
rdd2.foreach(lambda x:accum.add(1))
accum.value
参考
https://blog.csdn.net/HHHBan/article/details/116917993
https://zhuanlan.zhihu.com/p/342048988
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。