赞
踩
绝大多数 task 执行得都非常快,但个别 task 执行极慢。
在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操 作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。 因此出现数据倾斜的时候,Spark 作业看起来会运行得非常缓慢,甚至可能因为 某个 task 处理的数据量过大导致内存溢出。而整个 stage 的运行速度也 由运行最慢的那个 task 所决定。 10条 / 100 万条
数据倾斜只会发生在 shuffle 过程中。 可能会触 发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、 join、cogroup、repartition 等。
就是数据倾斜发生在第几个 stage 中。 用 yarn-client 模式 提交,那么本地是直接可以看到 log 的,可以在 log 中找到当前运行到了第几个 stage, 用 yarn-cluster 模式提交,则可以通过 Spark Web UI 来查看当前运行到 了第几个 stage。
在 Spark Web UI 上深入看一下当前这个 stage 各个 task 分配的数据量,从而进一 步确定是不是 task 分配的数据不均匀导致了数据倾斜。
知道数据倾斜发生在哪一个 stage 之后,接着我们就需要根据 stage 划分原理, 推算出来发生倾斜的那个 stage 对应代码中的哪一部分,这部分代码中肯定会有 一个 shuffle 类算子。一般shuffle类算子会切分stage。
看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到代码中哪一行发生了 内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能 就是这个算子导致了数据倾斜。 但是内存溢出不一定就是发生了数据倾斜,也有可能是代码存在bug。
查看导致数据倾斜的 key 的数据分布情况
知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并 且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。这主要是为 之后选择哪一种技术方案提供依据。
Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join, 然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业 中也就不需要使用原先的 shuffle 类算子执行这类操作了。
这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中 执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。 但是指标不治本,Hive ETL也会发生数据倾斜。
少数几个数据量特别多的 key,对作业的执行和 计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。 将导致数据倾斜的 key 给过滤掉之后,这些 key 就不会参与计算 了,自然不可能产生数据倾斜。
对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。 增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据 。
缺点:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况, 比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这 个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还 是会发生数据倾斜的。
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚 合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1), 就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打 上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚 合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个 key 的前缀给去 掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结 果了,比如(hello, 4)。
将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key, 就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解 决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合, 就可以得到最终的结果。
将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的 内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两 个 RDD 的数据用你需要的方式连接起来。 普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于 会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 +map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。
《五分钟学大数据-Spark数据倾斜及解决方案》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。