当前位置:   article > 正文

PySpark——性能调优_pyspark调优

pyspark调优

编辑器:jupyter notebook 

  1. import pyspark
  2. from pyspark import SparkContext, SparkConf
  3. from pyspark.sql import SparkSession
  4. import findspark
  5. findspark.init()
  6. # conf = SparkConf().setAppName('test').setMaster('local[*]')
  7. # sc = SparkContext(conf=conf)
  8. # spark SQL 编程
  9. spark = SparkSession \
  10. .builder \
  11. .appName("test") \
  12. .master("local[*]") \
  13. .enableHiveSupport() \
  14. .getOrCreate()
  15. # .config()
  16. # RDD 编程
  17. sc = spark.sparkContext

1.任务监控

  1. spark
  2. # Spark Web UI:http://localhost:4040/
  3. '''
  4. SparkSession - hive
  5. SparkContext
  6. Spark UI
  7. Version
  8. v3.1.2
  9. Master
  10. local[*]
  11. AppName
  12. test
  13. '''

# 一个 py 文件通常会创建一个 app 应用程序
# 一个 app 应用程序包含多个 job 作业,每个 action 操作(动作算子)对应一个 job 作业
# 每个 job 作业通过 shuffle 操作(内部处理器)拆分为若干组 task 任务来执行
# 每组 task 任务被称为 stage 阶段
# 分布式计算引擎就是将一个程序作业拆分为多个任务多个阶段并行来执行

Spark任务的执行情况,包括任务进度,耗时分析,存储分析,Shuffle大小等,常用页面是Stages和Excutors页面

Jobs:每个Action操作对应一个Job,以Job为单位显示进度,还有时间轴Timeline

Stages:Shuffle操作时将Job切分为多个Stage,显示每个Stage的执行进度以及Shuffle大小

  • 点击某个Stage可查看详情,其下面每个Task以及各个Partition的执行情况

Storage:监控缓存及数据存储大小

Environment:显示Spark和Scala版本,各种依赖包及其版本

Excutors:监控各个Excutors的存储和Shuffle的切分情况

SQL:监控各种SQL命令的执行情况

2.缓存和累加器

  • 减少不必要的重复加载与计算
  • 单机模式没有集群,故难以获得性能优势
rdd = sc.parallelize(range(10000000), 4)

优化前

  1. %%time
  2. total = rdd.reduce(lambda x, y: x+y)
  3. cnt = rdd.count()
  4. mean = total/cnt
  5. print(mean)
  6. '''
  7. 4999999.5
  8. CPU times: total: 31.2 ms
  9. Wall time: 5.58 s
  10. '''

优化后

  1. from pyspark.storagelevel import StorageLevel
  2. # 设置缓存
  3. rdd.persist(StorageLevel.MEMORY_ONLY)
  4. # 创建累加器
  5. acc_sum = sc.accumulator(0)
  6. acc_cnt = sc.accumulator(0)
  7. def func(iterator):
  8. for x in iterator:
  9. # 累计求和
  10. acc_sum.add(x)
  11. # 累计计数
  12. acc_cnt.add(1)
  13. %%time
  14. rdd.foreachPartition(func)
  15. mean = acc_sum.value/acc_cnt.value
  16. print(mean)
  17. rdd.unpersist()
  18. '''
  19. 4999999.5
  20. CPU times: total: 0 ns
  21. Wall time: 5.52 s
  22. PythonRDD[3] at RDD at PythonRDD.scala:53
  23. '''

3.数据倾斜调优

  • 单机模式没有集群,故难以获得性能优势
  1. rdd = sc.parallelize(["python"]*1000000+["pandas"]*10000+["spark"]*100, 4)
  2. rdd.take(5)
  3. # 词频统计结果
  4. # [('python', 1000000), ('spark', 100), ('pandas', 10000)]

优化前 

  1. %%time
  2. one = rdd.map(lambda x: (x, 1))
  3. agg = one.reduceByKey(lambda x, y: x+y)
  4. print(agg.collect())
  5. '''
  6. [('python', 1000000), ('spark', 100), ('pandas', 10000)]
  7. CPU times: total: 15.6 ms
  8. Wall time: 4.43 s
  9. '''

优化后

  1. from random import randint
  2. %%time
  3. # 数据倾斜:相同的key会拉到同一task来处理,如果某个key数据量特别大,就会发生数据倾斜
  4. # 原理:两阶段聚合,局部聚合+全局聚合
  5. # 将原本相同的key添加随机数前缀/后缀,数据量特别大的key就会变成多个不同的key
  6. # 那么原本应该被一个task处理的数据就被分配到多个task处理(局部聚合)
  7. # 这样就解决单个task处理数据量过多的问题
  8. # 接着去除掉随机前缀,再次进行全局聚合
  9. one = rdd.map(lambda x: (x, 1))
  10. merge = one.map(lambda x: (x[0]+"_"+str(randint(0, 10)), x[1]))
  11. agg1 = merge.reduceByKey(lambda x, y: x+y)
  12. split = agg1.map(lambda x: (x[0].split("_")[0], x[1]))
  13. agg2 = split.reduceByKey(lambda x, y: x+y)
  14. print(agg2.collect())
  15. '''
  16. [('python', 1000000), ('spark', 100), ('pandas', 10000)]
  17. CPU times: total: 31.2 ms
  18. Wall time: 6.69 s
  19. '''

4.广播变量

  • broadcast+mapPartitions代替join
  1. # 事实表的行数相对多,而维度表相对少
  2. # 维度表尽量转换为Python内置的数据结构
  3. # 产品ID, 订单ID
  4. order = sc.parallelize(
  5. [(3, 1),
  6. (2, 2),
  7. (3, 3),
  8. (2, 4),
  9. (2, 5),
  10. (1, 6),
  11. (1, 7),
  12. (3, 8),
  13. (1, 9)], 4)
  14. # 产品ID, 产品价格
  15. product = sc.parallelize(
  16. [(1, 56),
  17. (2, 43),
  18. (3, 26)])
  19. # 产品ID, 订单ID, 产品价格
  20. # [(1, 6, 56),
  21. # (1, 7, 56),
  22. # (1, 9, 56),
  23. # (2, 2, 43),
  24. # (2, 4, 43),
  25. # (2, 5, 43),
  26. # (3, 1, 26),
  27. # (3, 3, 26),
  28. # (3, 8, 26)]

优化前

  1. %%time
  2. join = order.join(product)
  3. result = join.map(lambda x: (x[0], x[1][0], x[1][1]))
  4. print(result.collect())
  5. '''
  6. [(1, 6, 56), (1, 7, 56), (1, 9, 56), (2, 2, 43), (2, 4, 43), (2, 5, 43), (3, 1, 26), (3, 3, 26), (3, 8, 26)]
  7. CPU times: total: 15.6 ms
  8. Wall time: 26 s
  9. '''

优化后

  1. # 将键值对RDD转换为Python字典,并创建广播变量
  2. bc = sc.broadcast(product.collectAsMap())
  3. dict_product = bc.value
  4. # {1: 56, 2: 43, 3: 26}
  5. def join_product(order):
  6. result = []
  7. for x in order:
  8. product_id = x[0]
  9. order_id = x[1]
  10. price = dict_product[product_id]
  11. row = (product_id, order_id, price)
  12. result.append(row)
  13. return result
  14. %%time
  15. join = order.mapPartitions(join_product)
  16. print(join.collect())
  17. '''
  18. [(3, 1, 26), (2, 2, 43), (3, 3, 26), (2, 4, 43), (2, 5, 43), (1, 6, 56), (1, 7, 56), (3, 8, 26), (1, 9, 56)]
  19. CPU times: total: 31.2 ms
  20. Wall time: 2.22 s
  21. '''

5.高性能算子

  • 避免使用低性能算子,尽量使用高性能算子
  • map -> mapPartitions
  • groupByKey -> reduceByKey/aggregateByKey
  • foreach -> foreachPartitions
  • filter -> filter+coalesce
  • repartition+sort -> repartitionAndSortWithinPartitions
  1. rdd = sc.parallelize(
  2. [(1, "杨玲"),
  3. (2, "程慧"),
  4. (1, "陈博"),
  5. (1, "褚佳"),
  6. (1, "周鹏"),
  7. (2, "戴霞")], 2)
  8. # [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]

优化前

  1. %%time
  2. result = rdd.groupByKey().map(lambda x: (x[0], list(x[1])))
  3. print(result.collect())
  4. '''
  5. [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]
  6. CPU times: total: 15.6 ms
  7. Wall time: 2.25 s
  8. '''

优化后

  1. %%time
  2. # groupByKey会产生大量shuffle(将task拆分为n个stage来完成,shuffle数量过多会影响性能)
  3. # reduceByKey/aggregateByKey
  4. # 每个分区内部先做一次合并,然后对每个分区的执行结果再做一次合并
  5. # 大大减少了shuffle数量
  6. result = rdd.aggregateByKey(
  7. [],
  8. # x为初始值“[]”
  9. lambda x, y: x+[y],
  10. lambda x, y: x+y)
  11. print(result.collect())
  12. '''
  13. [(2, ['程慧', '戴霞']), (1, ['杨玲', '陈博', '褚佳', '周鹏'])]
  14. CPU times: total: 31.2 ms
  15. Wall time: 2.33 s
  16. '''

6.SparkSQL

# 尽量使用SparkSQL,其计算引擎会自动优化

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/679713
推荐阅读
相关标签
  

闽ICP备14008679号