赞
踩
多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作。
一个理想的分布式程序:
发生数据倾斜时,任务的执行速度由最大的那个任务决定:
发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。
如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。
选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个
df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)
如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。
经过分析,倾斜的数据主要有以下三种情况:
第1,2种情况,直接对数据进行过滤即可。
第3种情况则需要进行一些特殊操作,常见的有以下几种做法。
reduceByKey
代替 groupByKey
如果使用reduceByKey
因为数据倾斜造成运行失败的问题。具体操作如下:
key
转化为 key + 随机值
(例如Random.nextInt) reduceByKey(func)
key + 随机值
转成 key
reduceByKey(func)
tip1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。
tips2: 单独处理异常数据时,可以配合使用Map Join解决。
dataFrame
和sparkSql
可以设置spark.sql.shuffle.partitions
参数控制shuffle的并发度,默认为200。
rdd操作可以设置spark.default.parallelism
控制并发度,默认参数由不同的Cluster Manager控制。
局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
在小表不是特别大(取决于你的executor大小)的情况下使用,可以使程序避免shuffle的过程,自然也就没有数据倾斜的困扰了。
局限性: 因为是先将小数据发送到每个executor上,所以数据量不能太大。
具体使用方法和处理流程参照:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。