赞
踩
Spark 数据倾斜(Data Skew)是一个比较常见的问题。它指的是数据分布不均匀,部分key对应的value数据过多。
有的数据要处理较多,任务执行速度受制于这部分数据。
data = spark.read.text("data.txt")
wordCounts = data
.select(explode(split(data.value," ")).alias("word"))
.groupBy("word")
.count()
wordCounts.show()
如果 data.txt 的内容是:
hello hello scala spark spark hi
spark spark hive hive hive hive
spark spark spark spark spark ... #集中很多spark
然后查看执行计划:
wordCounts.explain()
可以看到:
这意味着:
解决方案是:
使得每个tasks负责的数据量更均匀,避免bottlenecks。
处理大量value数据需要更多内存。
同上述案例:
由于spark
单词的数据集中特别多,占用的内存就会特别大。
如果超出了Executor的内存限制,就会发生内存溢出错误。
如 OutOfMemoryError: Java heap space 或者 ExecutorLostFailure。
为了解决这个问题,可以:
你可以通过两种方式调整partitions的数量:
# 指定200个partitions
wordCounts.groupBy("word").partitionBy(200).count()
repartition()
方法调整partitions数量。比如:# 重新partition为200个
wordCounts = wordCounts.repartition(200)
wordCounts.groupBy("word").count()
Spark会将原始partitions的数据均匀分配到新指定的partitions上。
调整partitions数量的好处:
在datasets特别大的数据倾斜情况下,需要:
另外,还可以使用 coalesce()
动态调整partitions数量。
总的来说,通过合理调整partitions数量,可以有效缓解数据倾斜。
有的数据要广播、shuffle、join等更多次。
Broadcast
将驱动器中的变量广播到executor进程。
如果数据倾斜严重,就需要频繁广播集中在一个分区的数据给所有executor。
Shuffle
shuffle操作会产生更多的数据交换,最明显的一个例子就是 groupBy 操作。
对于数据倾斜的情况:
Join
join也依赖于 shuffle 进行。
如果一张表的数据倾斜严重,join时就需要将它交换给更多任务。
举个例子:
# 倾斜表
df1 = spark.createDataFrame([(1,'a')], ['id','word'])
df1 = df1.unionAll(df1).unionAll(df1) # 加大一列的数据量
# 正常表
df2 = spark.createDataFrame([(1,'b')], ['id','word'])
# join
result = df1.join(df2, 'id')
result.count()
这里df1的数据量明显大于df2,join时就需要将df1交换给更多任务。这说明数据倾斜会增加数据交换,影响性能。
比如说我们有一份关于用户行为数据:
用户ID | 操作 |
---|---|
1 | 点赞 |
2 | 转发 |
3 | 评论 |
4 | 购买 |
… | … |
999 | 点赞 |
1000 | 点赞 |
1001 | 点赞 |
由于数据本身就存在偏差,点赞操作的数量远远多于其他类型操作。
然后我们进行分组统计:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.option("header", "true").csv("data.csv")
result = df.groupBy("操作").count()
result.show()
我们会发现:
这就是数据本身就存在倾斜的例子。
解决方案是:
总的来说,当数据本身存在很明显的偏差时,就需要针对这种倾斜来进行优化。
不合理的join条件也会导致数据倾斜,从而增加数据交换。
举个例子:
# 表1 数据倾斜,某一列值更集中
df1 = spark.createDataFrame([(1,'a'),(2,'b'),(1,'c'),(1,'d')],['id','word'])
# 表2 没有倾斜
df2 = spark.createDataFrame([(1,'x'),(2,'y'),(3,'z')],['id','word'])
# 按 id 列 join
result = df1.join(df2, 'id')
result.count()
这里的数据集 df1 的 id 列值为1的行比较多。
那么在id列上进行join时,就需要将df2对应id=1的数据broad cast(交换)给更多任务。
因此,在join条件上使用倾斜严重的数据集,就会增加数据交换。
解决方案是:
总的来说,不合理的join条件会增加数据倾斜,从而增加数据交换和shuffle时期。
这里有一个更清楚的例子:
假设我们有一个用户表,其中一列是user_id
和一列hash_key
:
user_id | hash_key |
---|---|
1 | a |
2 | b |
3 | c |
4 | a |
… | … |
我们根据hash_key列进行分组聚合:
df.groupBy("hash_key").count()
但是如果hash_key是通过取模生成的:
hash_key = user_id % 3
这会产生大量冲突:
因此,这样不合理的key(hash_key
)会导致:
解决方案是:
总的来说,使用不合理的key,可能会产生大量数据在同一组下,导致倾斜。
选择能更均匀分布的数据作为Key。
优化Key就是选择一个能更均匀分布的数据作为分组(group) 的key,来缓解数据倾斜。
举个例子:
我们有一个用户表,包含user_id
和sex
两列:
user_id | sex |
---|---|
1 | male |
2 | female |
3 | male |
4 | female |
然后我们原本按sex
列进行分组:
df.groupBy("sex").count()
问题是数据中男性用户明显多于女性用户,sex
列存在倾斜。
我们可以优化key,使用user_id
作为分组key:
df.groupBy("user_id").count()
因为user_id
是连续的ID,它的分布非常均匀。
这样做有两个好处:
sex
) 导致性能问题总的来说,优化key就是使用分布更均匀的数据作为分组key,来缓解数据倾斜。
如数据汇聚、数据过滤等方式。
当数据存在倾斜时,我们可以通过以下方式降低倾斜程degree:
直接过滤掉部分倾斜严重的数据。
例如:
# 原数据存在倾斜
df = ...
# 过滤掉部分数据
filte_df = df.filter(df["col"] < 1000)
将部分倾斜数据聚合成一组。
例如:
# 将某一列倾斜的数据聚合
agg_df = df.groupBy("col").agg(f.first("col"), f.count("col"))
# 取col> 1000的汇总数据
agg_df = agg_df.filter(agg_df["col"] > 1000)
对数据进行重采样,降低倾斜程度。
from pyspark.mllib.sampling import SamplingUtils
# 原始数据
df = ...
# 下采样倾斜字段
sampled_df = SamplingUtils.sampleBy(df, "col", 0.5)
总的来说,我们可以通过过滤、聚合和重采样等方式,降低数据倾斜程度。这使得后续的分区、聚集等操作更均匀。
将数据分散到更多分区,降低每个分区数据量。
调整分区(repartition)也是重要的缓解数据倾斜方法。
例如,我们有个数据表含有城市信息:
id | city |
---|---|
1 | 北京 |
2 | 北京 |
3 | 上海 |
4 | 成都 |
5 | 成都 |
… | … |
数据中北京城的数据量明显多于其他城市。
然后我们按城市分组统计:
df.groupBy("city").count()
计算会分配给不同的任务(partitions)。
由于北京城的数据量多,它会分配给较少的分区。
这就导致部分分区负载过重,性能下降。
为缓解这个问题,我们可以增加分区数量:
df.repartition(100)\
.groupBy("city")
.count()
这里我们将分区数增加到100个。
这有助于:
总的来说,当数据存在严重倾斜时,我们可以增加分区数量,将集中的数据分散到多个分区,以提高整体效率。
根据当前数据情况动态增加分区。
动态调整分区是指在程序执行的过程中,根据需要增加或减小分区的数量。
这可以优化Spark作业的执行效率。
例如:
#读取数据时指定分区数量
df = spark.read.option("partitionOverwrite","true").option("numPartitions",100).csv("data.csv")
# 在group by 时,进一步增加分区数量
df = df.repartition(200)
.groupBy("id")
.count()
# 再进行 join 时,可以适当减少分区
df = df.repartition(150)
.join(other_df, "id")
这里我们:
动态调整分区的好处是:
总的来说,我们可以在不同阶段根据情况动态调整分区数量,达到最佳效果。
限制每个 Executor 能占用的内存量。
当数据倾斜严重时,单个分区的内存消耗可能会非常高。
为了解决这个问题,我们可以通过以下方式来控制每个executor的内存使用:
spark.sql.shuffle.partitions 这个参数决定了shuffle阶段生成的分区数量。
调大这个参数能有助于控制每个分区的内存消耗。
通过spark.executor.memory参数设置每个executor分配的内存上限。
这能限制单个executor的内存使用。
对于内存消耗大的算子,我们可以设置它的内存限制。
例如:
//设置 groupBy算子使用的最大内存
df.groupBy(...).sqlContext.setConf("spark.sql.groupBy.sort.maxMemory", "2g")
可以限制 Spark 作业整体使用的内存。
// 限制 Spark 使用的内存为 10G
SparkContext.setConf("spark.driver.memory", "10g")
总的来说,通过设置相关参数,我们可以限制单个executor或整体使用的内存上限。这有助于缓解由于数据倾斜导致的内存溢出问题。
在出现 OOM 时重试。
当数据倾斜严重时,单个分区内存消耗会非常高,可能会导致任务失败。
为了解决这个问题,我们可以采用失败后重试(retry on failure)的方式:
val retryConf = Map(
"spark.sql.shuffle.partitions" -> "800",
"spark.driver.maxResultSize" -> "2g")
val df = spark.read.parquet("data.parquet")
df.groupBy("key").count()
.withStrategy(ExponentialBackoff(lowerBound=1000, upperBound=10000, multiplier=1.5)) // 指数退避策略
.retry(3) {
case e: OutOfMemoryError => true // 重试出内存错误
}
.option(retryConf.toMap)
这里我们做了以下事情:
这种失败后重试的方式有助于:
总的来说,当数据倾斜严重时,我们可以采用失败后重试的方式来加强稳定性。
bottlenecks意为"瓶颈",指的是整体效率受到一个或几个部分的限制。在Spark数据倾斜的场景下,可以举个例子:假设你的数据集中spark
这个单词的数量特别多,占整体数据的80%。然后你对这个数据执行了一个groupBy聚合:Spark会将这个groupBy操作拆分成多个tasks。由于spark
这个单词的数据集中特别多,它可能会被分配给一个task。而其他话数对应的少量数据可能分配给多个tasks。这时,spark
这个单词所分配的那个task,会需要比其他task更长的时间来统计。这个task就变成了整个job的bottlenecks。也就是说,整体job的运行时间被这个统计spark
单词任务所限制,效率受到它的限制。 ↩︎
最佳或最优解,能带来最高效率 ↩︎
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。