赞
踩
定义:云计算是一种通过互联网提供计算服务的技术。相比于传统计算,它的资源获取方式,从“买”变为“租”
优点:
定义 :RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
五大特性:
部署类型 | 私有云 | 公有云 | 社区云 | 混合云 |
---|---|---|---|---|
定义 | 由单一组织独享的云计算资源。 | 由第三方服务提供商提供的云计算资源,面向公众或多个租户。 | 由多个组织共同拥有和使用的云计算资源,通常有相似的需求和目标。 | 结合了私有云和公有云的优点,允许数据和应用在私有云和公有云之间进行迁移和交互。 |
优点 | 1. 安全; 2.自主可控。 | 1.成本低; 2.高扩展性; 3.便捷性。 | 1.成本共享; 2.安全性和隐私性; 3.协作。 | 1.灵活性; 2.成本效益; 3.高可用性。 |
缺点 | 1.成本高; 2.维护复杂; 3.扩展性有限。 | 1.安全性和隐私性; 2.控制权有限; 3.依赖性。 | 1. 复杂性; 2.扩展性受限; 3.合规性问题。 | 1. 管理复杂; 2.安全挑战; 3.集成难度。 |
Infrastructure as a Service(IaaS):基础设备即服务
Platform as a Service(PaaS):平台即服务
Software as a Service(SaaS):软件即服务
分布式集合对象上的API称之为算子
Python中使用pyspark初始化
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':
//构建SparkConf对象
conf = SparkConf().setAppName('helloworld').setMaster('local[*]')
//构建SparkContext执行环境入口对象
sc = SparkContext(conf = conf)
map:是将RDD的数据一条条处理,返回新的RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
#使用map 算子将每个元素乘以 2
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())
#输出: [2, 4, 6, 8, 10]
flatMap: 对rdd执行map操作,然后进行解除嵌套操作
data = ["hello world", "apache spark"]
rdd = sc.parallelize(data)
// 使用 flatMap 算子将每个字符串拆分为单词
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect())
// 输出: ['hello', 'world', 'apache', 'spark']
sortBy和sortByKey算子都是用于对RDD进行排序的
sortBy
data = [("Alice", 23), ("Bob", 20), ("Charlie", 25)]
rdd = sc.parallelize(data)
//按年龄排序
sorted_rdd = rdd.sortBy(lambda x: x[1])
print(sorted_rdd.collect())
//输出: [('Bob', 20), ('Alice', 23), ('Charlie', 25)]
sortByKey
data = [("Alice", 23), ("Bob", 20), ("Charlie", 25)]
rdd = sc.parallelize(data)
//按名字排序
sorted_rdd = rdd.sortByKey()
print(sorted_rdd.collect())
//输出: [('Alice', 23), ('Bob', 20), ('Charlie', 25)]
rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1)], 3)
// 默认按照key进行升序排序
print("默认: ",rdd.sortByKey().collect())
#如果要确保全局有序,排序分区数要给1,不是1的话,只能确保各个分区内排好序整体上不保证
print("多分区: ",rdd.sortByKey(ascending=False,numPartitions=5).collect())
#对排序的key进行处理,拍排序前处理一下key ,让key以你处理的样子进行排序(不影响数据本身)
print("单分区: ",rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
// 默认: [('C', 1), ('D', 1), ('E', 1), ('a', 1), ('b', 1), ('f', 1), ('g', 1)]
// 多分区: [('g', 1), ('f', 1), ('b', 1), ('a', 1), ('E', 1), ('D', 1), ('C', 1)]
// 单分区: [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1)]
reduceByKey和groupByKey算子都是用于对键值对(Pair RDD)进行操作的算子
reduceByKey
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)
// 对相同键的值进行求和
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())
// 输出: [('a', 3), ('b', 1)]
groupeByKey
rdd = sc.parallelize([1,2,3,4,5])
// 分组,将数字分层偶数和奇数2个组
rdd2 = rdd.groupBy(lambda num: 'even' if (num % 2 == 0) else 'odd')
// 将rdd2的元素的value转换成list,这样print可以输出内容.
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
// [('odd', [1, 3, 5]), ('even', [2, 4])]
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 2), ('b', 3)])
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t:(t[0], list(t[1]))).collect())
// [('b', [('b', 2), ('b', 3)]), ('a', [('a', 1), ('a', 1)])]
data = [("a", 1), ("b", 1), ("a", 2)]
rdd = sc.parallelize(data)
// 对相同键的值进行分组
grouped_rdd = rdd.groupByKey()
// 将结果转换为列表以便查看
result = [(k, list(v)) for k, v in grouped_rdd.collect()]
print(result)
// 输出: [('a', [1, 2]), ('b', [1])]
filter算子:返回是True的数据被保留,False的数据被丢弃
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
// 通过Filter算子, 保留奇数
result = rdd.filter(lambda x: x % 2 == 1)
print(result.collect())
// 结果:[1,3,5]
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
// 保留奇数
rdd.filter(lambda x: True if (x % 2 == 1) else False)
print(rdd.filter(lambda x: x % 2 == 1).collect())
// 结果:[1,3,5]
distinct算子:去重操作
rdd1 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])
print(rdd1.distinct().collect())
// 结果: [('a', 3), ('a', 1)]
union算子:2个rdd合并成1个rdd返回
rdd1 = sc.parallelize([1, 1, 3, 3])
rdd2 = sc.parallelize(["a", "b", "a"])
rdd3 = rdd1.union(rdd2)
print(rdd3.collect())
// 结果: [1, 1, 3, 3, 'a', 'b', 'a']
join: join算子只能用于二元元组
data1 = [("a", 1), ("b", 2)]
data2 = [("a", 3), ("a", 4), ("b", 5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 内连接
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5))]
leftOuterJoin 算子
data1 = [("a", 1), ("b", 2), ("c", 3)]
data2 = [("a", 3), ("a", 4), ("b", 5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 左外连接
left_joined_rdd = rdd1.leftOuterJoin(rdd2)
print(left_joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (3, None))]
rightOuterJoin 算子
data1 = [("a", 1), ("b", 2), ("d", 4)]
data2 = [("a", 3), ("a", 4), ("b", 5), ("c", 6)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
// 右外连接
right_joined_rdd = rdd1.rightOuterJoin(rdd2)
print(right_joined_rdd.collect())
// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (None, 6))]
intersection算子:求2个rdd的交集,返回一个新rdd
rdd1 = sc.parallelize([('a', 1), ('a', 3)])
rdd2 = sc.parallelize([('a', 1), ('b', 3)])
// 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
// 结果:[('a', 1)]
glom:将RDD的数据,加上嵌套,这个嵌套按照分区 来进行
比如RDD数据[1,2,3,4,5]有2个分区那么,被glom后,数据变成:[[1,2,3],[4,5]]
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)
print(rdd.glom().flatMap(lambda x: x).collect())
//结果:[[1, 2, 3, 4], [5, 6, 7, 8, 9]]
countByKey: 统计key出现的次数(一般适用KV型RDD)
rdd = sc.textFile("../data/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
# 通过countByKey来对key进行计数, 这是一个Action算子
# result 不是rdd而是dict
result = rdd2.countByKey()
print(result)
print(type(result))
# 结果:defaultdict(<class 'int'>, {'hello': 3, 'xing': 2, 'qian': 4})<class 'collections.defaultdict'>
reduce:对RDD数据集按照你传入的逻辑进行聚合
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
# 结果:15
fold:对RDD数据集按照你传入的逻辑进行聚合
这个初始值聚合,会作用在:分区内聚合、分区间聚合
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
print(rdd.fold(10, lambda a, b: a + b))
# 结果:85
#分区1: 123聚合的时候带上10作为初始值得到16=1+2+3+ 10
#分区2: 456聚合的时候带上10作为初始值得到25 =4+5+6+ 10
#分区3: 789聚合的时候带上10作为初始值得到34 =7+8+9+ 10
#3个分区的结果做聚合也带上初始值10,
#所以结果是: 16+25+34+10 = 85
first:取出RDD的第一个元素
result = sc.parallelize([2,4,5,6,7]).first()
print(result)
# 结果 2
take:取RDD的前N个元素,组合成list返回给你
result = sc.parallelize([22,4,5,346,7,56,78,55]).take(5)
print(result)
# 结果 [22, 4, 5, 346, 7]
top:对RDD数据集进行降序排序,取前N个
result = sc.parallelize([22,4,5,346,7,56,78,55]).top(5)
print(result)
# 结果:[346, 78, 56, 55, 22]
count:计算RDD有多少条数据,返回值是一个数字
result = sc.parallelize([22,4,5,346,7,56,78,55]).count(5)
print(result)
# 结果:8
takeSample:随机抽样RDD的数据
rdd = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)
print(rdd.takeSample(False, 5, 1))
# 结果:[2, 7, 6, 6, 3]
takeOrdered:对RDD进行排序取前N个
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
print(rdd.takeOrdered(3, lambda x: -x))
# 结果: [9, 7, 6]
foreach:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值
rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)
#注意,forreach函数没有返回值,因此在里面直接打印了
result = rdd.foreach(lambda x: print(x * 10, end=" "))
#这个result对象是None
print(result)
# 结果:None
saveAsTextFile:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
P.S. foreach 与 saveAsTestFile这两个算子是分区(Executor) 跳过Driver,由分区所在的Executor直接执行。其余的Action算子都会将结果发送至Driver
自用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。