赞
踩
编辑器:jupyter notebook
- import pyspark
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SparkSession
- import findspark
- findspark.init()
- # conf = SparkConf().setAppName('test').setMaster('local[*]')
- # sc = SparkContext(conf=conf)
-
- # spark SQL 编程
- spark = SparkSession \
- .builder \
- .appName("test") \
- .master("local[*]") \
- .enableHiveSupport() \
- .getOrCreate()
- # .config()
-
- # RDD 编程
- sc = spark.sparkContext
- spark
- # Spark Web UI:http://localhost:4040/
-
- '''
- SparkSession - hive
- SparkContext
- Spark UI
- Version
- v3.1.2
- Master
- local[*]
- AppName
- test
- '''
# 一个 py 文件通常会创建一个 app 应用程序
# 一个 app 应用程序包含多个 job 作业,每个 action 操作(动作算子)对应一个 job 作业
# 每个 job 作业通过 shuffle 操作(内部处理器)拆分为若干组 task 任务来执行
# 每组 task 任务被称为 stage 阶段
# 分布式计算引擎就是将一个程序作业拆分为多个任务多个阶段并行来执行
Spark任务的执行情况,包括任务进度,耗时分析,存储分析,Shuffle大小等,常用页面是Stages和Excutors页面
Jobs:每个Action操作对应一个Job,以Job为单位显示进度,还有时间轴Timeline
Stages:Shuffle操作时将Job切分为多个Stage,显示每个Stage的执行进度以及Shuffle大小
Storage:监控缓存及数据存储大小
Environment:显示Spark和Scala版本,各种依赖包及其版本
Excutors:监控各个Excutors的存储和Shuffle的切分情况
SQL:监控各种SQL命令的执行情况
rdd = sc.parallelize(range(10000000), 4)
- %%time
- total = rdd.reduce(lambda x, y: x+y)
- cnt = rdd.count()
- mean = total/cnt
- print(mean)
- '''
- 4999999.5
- CPU times: total: 31.2 ms
- Wall time: 5.58 s
- '''
- from pyspark.storagelevel import StorageLevel
- # 设置缓存
- rdd.persist(StorageLevel.MEMORY_ONLY)
-
- # 创建累加器
- acc_sum = sc.accumulator(0)
- acc_cnt = sc.accumulator(0)
-
-
- def func(iterator):
- for x in iterator:
- # 累计求和
- acc_sum.add(x)
- # 累计计数
- acc_cnt.add(1)
-
-
- %%time
- rdd.foreachPartition(func)
- mean = acc_sum.value/acc_cnt.value
- print(mean)
- rdd.unpersist()
- '''
- 4999999.5
- CPU times: total: 0 ns
- Wall time: 5.52 s
- PythonRDD[3] at RDD at PythonRDD.scala:53
- '''
- rdd = sc.parallelize(["python"]*1000000+["pandas"]*10000+["spark"]*100, 4)
- rdd.take(5)
- # 词频统计结果
- # [('python', 1000000), ('spark', 100), ('pandas', 10000)]
- %%time
- one = rdd.map(lambda x: (x, 1))
- agg = one.reduceByKey(lambda x, y: x+y)
- print(agg.collect())
- '''
- [('python', 1000000), ('spark', 100), ('pandas', 10000)]
- CPU times: total: 15.6 ms
- Wall time: 4.43 s
- '''
- from random import randint
-
- %%time
- # 数据倾斜:相同的key会拉到同一task来处理,如果某个key数据量特别大,就会发生数据倾斜
- # 原理:两阶段聚合,局部聚合+全局聚合
- # 将原本相同的key添加随机数前缀/后缀,数据量特别大的key就会变成多个不同的key
- # 那么原本应该被一个task处理的数据就被分配到多个task处理(局部聚合)
- # 这样就解决单个task处理数据量过多的问题
- # 接着去除掉随机前缀,再次进行全局聚合
- one = rdd.map(lambda x: (x, 1))
- merge = one.map(lambda x: (x[0]+"_"+str(randint(0, 10)), x[1]))
- agg1 = merge.reduceByKey(lambda x, y: x+y)
- split = agg1.map(lambda x: (x[0].split("_")[0], x[1]))
- agg2 = split.reduceByKey(lambda x, y: x+y)
- print(agg2.collect())
- '''
- [('python', 1000000), ('spark', 100), ('pandas', 10000)]
- CPU times: total: 31.2 ms
- Wall time: 6.69 s
- '''
- # 事实表的行数相对多,而维度表相对少
- # 维度表尽量转换为Python内置的数据结构
- # 产品ID, 订单ID
- order = sc.parallelize(
- [(3, 1),
- (2, 2),
- (3, 3),
- (2, 4),
- (2, 5),
- (1, 6),
- (1, 7),
- (3, 8),
- (1, 9)], 4)
-
- # 产品ID, 产品价格
- product = sc.parallelize(
- [(1, 56),
- (2, 43),
- (3, 26)])
-
- # 产品ID, 订单ID, 产品价格
- # [(1, 6, 56),
- # (1, 7, 56),
- # (1, 9, 56),
- # (2, 2, 43),
- # (2, 4, 43),
- # (2, 5, 43),
- # (3, 1, 26),
- # (3, 3, 26),
- # (3, 8, 26)]
- %%time
- join = order.join(product)
- result = join.map(lambda x: (x[0], x[1][0], x[1][1]))
- print(result.collect())
- '''
- [(1, 6, 56), (1, 7, 56), (1, 9, 56), (2, 2, 43), (2, 4, 43), (2, 5, 43), (3, 1, 26), (3, 3, 26), (3, 8, 26)]
- CPU times: total: 15.6 ms
- Wall time: 26 s
- '''
- # 将键值对RDD转换为Python字典,并创建广播变量
- bc = sc.broadcast(product.collectAsMap())
- dict_product = bc.value
- # {1: 56, 2: 43, 3: 26}
-
- def join_product(order):
- result = []
- for x in order:
- product_id = x[0]
- order_id = x[1]
- price = dict_product[product_id]
- row = (product_id, order_id, price)
- result.append(row)
- return result
-
- %%time
- join = order.mapPartitions(join_product)
- print(join.collect())
- '''
- [(3, 1, 26), (2, 2, 43), (3, 3, 26), (2, 4, 43), (2, 5, 43), (1, 6, 56), (1, 7, 56), (3, 8, 26), (1, 9, 56)]
- CPU times: total: 31.2 ms
- Wall time: 2.22 s
- '''
- rdd = sc.parallelize(
- [(1, "杨玲"),
- (2, "程慧"),
- (1, "陈博"),
- (1, "褚佳"),
- (1, "周鹏"),
- (2, "戴霞")], 2)
- # [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]
- %%time
- result = rdd.groupByKey().map(lambda x: (x[0], list(x[1])))
- print(result.collect())
- '''
- [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]
- CPU times: total: 15.6 ms
- Wall time: 2.25 s
- '''
- %%time
- # groupByKey会产生大量shuffle(将task拆分为n个stage来完成,shuffle数量过多会影响性能)
- # reduceByKey/aggregateByKey
- # 每个分区内部先做一次合并,然后对每个分区的执行结果再做一次合并
- # 大大减少了shuffle数量
- result = rdd.aggregateByKey(
- [],
- # x为初始值“[]”
- lambda x, y: x+[y],
- lambda x, y: x+y)
- print(result.collect())
- '''
- [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]
- CPU times: total: 31.2 ms
- Wall time: 2.33 s
- '''
# 尽量使用SparkSQL,其计算引擎会自动优化
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。