赞
踩
如上图所示,在执行shuffle操作时,会根据key进行value的聚合、拉取、输出。相同的key一定会分配到同一个分区内进行处理。如上图,同一个key的values,最后一定是分配到一个reduceTask进行处理的。这样就会导致hello有7个value要处理,world和you只有1个value要处理,很明显当world和you处理完,hello还在进行处理,这样就导致了数据的倾斜。
当现实中业务的数据很大的时候,出现数据倾斜轻则会导致数据处理特别慢(大部分reduce已经执行完任务,而个别reduce任务还要执行很久),重则Spark任务无法执行。
scala> val rdd = sc.parallelize(List("hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello"),3)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.glom.collect
res0: Array[Array[String]] = Array(Array(hello, hello, hello, hello, hello, hello, hello), Array(hello, hello, hello, hello, hello, hello, hello), Array(hello, hello, hello, hello, hello, hello, hello))
scala> rdd.map((_,1)).glom.collect
res1: Array[Array[(String, Int)]] = Array(Array((hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1)), Array((hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1)), Array((hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1), (hello,1)))
scala> rdd.map((_,1)).groupByKey.glom.collect
res2: Array[Array[(String, Iterable[Int])]] = Array(Array(), Array((hello,CompactBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1))), Array())
如上代码我们可以看出,我们把rdd进行groupByKey操作之后,相同的key对应的value都分到了同一个分区下,每一个分区对应一个task任务,这样就导致了两个分区没有数据,只有一个分区有数据,这样就造成了数据倾斜,只有RDD在执行shuffle操作时候,才有可能出现输出倾斜。
①处理数据时有一个或多个任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成,说明该reducetask在数据处理量与未完成的reducetask数据处理量相差很大,这也就造成了数据倾斜。
②另一种情况就是其它reduceTask任务都执行完了,只有个别reduceTask会突然报一个OOM、JVM Out Of Memory、内存溢出、task failed、task lost或resubmitting task,反复执行该Job都不行,这就是数据倾斜导致任务无法执行的表现,出现这种情况必须要解决数据倾斜,不然就别提优化了。
数据倾斜只有在执行shuffle操作才会发生,首先可以先在程序中查找用到的哪些算子需要执行shuffle操作,例如:reduceByKey、groupByKey、countByKey、distinct、join等。
然后我们需要查看任务执行的log日志文件,log一般会报是在你的哪一行代码,导致了OOM异常。
如果没有的话,还可以看看log中任务执行到了第几个stage,根据我们编写的代码,分析我stage是如何划分的,这样就能根据log中执行到的stage,找到对应算子执行shuffle操作时候所划分的stage,这样就找到了是哪里发生了数据倾斜。
这也是最直接、最简单、最有效的方法,就是聚合数据源。在平时我们用到的算子中,经常会进行聚合操作,例如,reduceByKey、groupByKey等,他们都是在Spark作业中执行的。一般Spark作业的数据来源于hive表,hdfs(分布式文件存储系统)上存储的数据。
hive就是适合做离线的,晚上凌晨跑的,ETL(extract transform load)即数据的采集、清洗、导入,还有hive sql,去做以上这些事情,从而去形成一个完整的hive中的数据仓库。spark作业中的源hive表,通常也是通过某些hive etl生成的。
数据倾斜,也就意味着某个key对应10万条数据,某些key对应几百条,某些key只对应几十条。这时候我们就可以直接在生成hive表的hive etl中对数据进行聚合。比如按key来分组,将key对应的所有的values全部用一种特殊的格式拼接到一个字符串里面去。这也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了,直接对每个key对应的values字符串进行map操作,然后再进行我们需要的操作即可。这也就避免了进行shuffle操作。
除了这种方式,另一种数据源聚合的方式就是粗粒化Key。例如有10万条数据包含了多个城市,各个地区,最近一周,每天的天气情况。我们可以直接按照城市的粒度,做聚合操作,把各个地区,最近一周,每天的天气情况都给聚合起来。减少每个key对应的value数量,聚合之后的数据会大幅减少,可能由10万条数据减少到几万条,这样也能尽可能避免因shuffle操作导致的数据倾斜。
根据我们的业务需求和实际情况,我们可以从hive表查询源数据的时候,直接在sql中用where条件过滤掉某几个将导致数据倾斜的key。这样也能避免因shuffle操作导致的数据倾斜。
如果前两个方案都不能解决,则采用这种。
提高shuffle操作reduce并行度,也就是增加reduceTask的数量,这样也能尽可能减小或解决数据倾斜情况的发生。
那么如何提高reduce并行度?
在我们调用shuffle算子(groupByKey、reduceByKey、countByKey等)的时候,传入进去一个参数。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。
例:m.groupByKey(10).glom.collect,即创建了10个reduceTaks
这种方式有时候也不能完全解决数据倾斜的情况,只能使整个Spark工作执行的更快一点,但是还是很慢。这就要采用后面的四种方案了。
使用随机key实现双重聚合,这种方法只针对于groupByKey和reduceByKey。
主要方法就是在第一次聚合前先将每个key前面加一个随机数,这样来打散Key,避免数据倾斜。
进行groupByKey分组后,通过map操作对每个分区内的value进行一次sum求和。
求完和后在通过substring去掉Key的前缀,然后在进行reduceByKey聚合。
另一种方式就是在打散Key之后,进行reduceByKey聚合。
聚合之后在进行substring去掉Key的前缀,然后在进行reduceByKey聚合。
废话不多说,上代码!
scala> val rdd = sc.parallelize(List("hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello","hello"),3) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> rdd.glom.collect res0: Array[Array[String]] = Array(Array(hello, hello, hello, hello, hello, hello, hello), Array(hello, hello, hello, hello, hello, hello, hello), Array(hello, hello, hello, hello, hello, hello, hello)) scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).glom.collect res4: Array[Array[(String, Int)]] = Array(Array((8_hello,1), (1_hello,1), (9_hello,1), (3_hello,1), (1_hello,1), (2_hello,1), (5_hello,1)), Array((2_hello,1), (8_hello,1), (9_hello,1), (3_hello,1), (4_hello,1), (6_hello,1), (6_hello,1)), Array((0_hello,1), (7_hello,1), (4_hello,1), (4_hello,1), (0_hello,1), (0_hello,1), (5_hello,1))) scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).groupByKey.glom.collect res5: Array[Array[(String, Iterable[Int])]] = Array(Array((2_hello,CompactBuffer(1)), (6_hello,CompactBuffer(1, 1, 1, 1)), (9_hello,CompactBuffer(1, 1))), Array((3_hello,CompactBuffer(1, 1)), (0_hello,CompactBuffer(1, 1, 1)), (7_hello,CompactBuffer(1, 1, 1, 1))), Array((4_hello,CompactBuffer(1, 1)), (5_hello,CompactBuffer(1)), (1_hello,CompactBuffer(1)), (8_hello,CompactBuffer(1)))) scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).groupByKey.map(t=>(t._1,t._2.sum)).glom.collect res6: Array[Array[(String, Int)]] = Array(Array((2_hello,2), (6_hello,1), (9_hello,3)), Array((0_hello,3)), Array((4_hello,2), (5_hello,4), (1_hello,5), (8_hello,1))) scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).groupByKey.map(t=>(t._1,t._2.sum)).map(t=>(t._1.substring(2),t._2)).glom.collect res7: Array[Array[(String, Int)]] = Array(Array((hello,7), (hello,2), (hello,3)), Array((hello,2), (hello,1)), Array((hello,3), (hello,1), (hello,2))) scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).groupByKey.map(t=>(t._1,t._2.sum)).map(t=>(t._1.substring(2),t._2)).reduceByKey(_+_).glom.collect res8: Array[Array[(String, Int)]] = Array(Array(), Array((hello,21)), Array())
scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).glom.collect
res9: Array[Array[(String, Int)]] = Array(Array((6_hello,1), (8_hello,1), (6_hello,1), (3_hello,1), (1_hello,1), (1_hello,1), (9_hello,1)), Array((9_hello,1), (7_hello,1), (3_hello,1), (8_hello,1), (3_hello,1), (5_hello,1), (8_hello,1)), Array((8_hello,1), (8_hello,1), (1_hello,1), (3_hello,1), (8_hello,1), (4_hello,1), (0_hello,1)))
scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).reduceByKey(_+_).glom.collect
res10: Array[Array[(String, Int)]] = Array(Array((2_hello,2), (6_hello,1), (9_hello,3)), Array((3_hello,1), (0_hello,2), (7_hello,1)), Array((4_hello,4), (5_hello,2), (1_hello,4), (8_hello,1)))
scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).reduceByKey(_+_).map(x=>(x._1.substring(2),x._2)).glom.collect
res11: Array[Array[(String, Int)]] = Array(Array((hello,5), (hello,2), (hello,3)), Array((hello,2), (hello,1)), Array((hello,3), (hello,2), (hello,1), (hello,2)))
scala> rdd.map(x=>{var rd=scala.util.Random.nextInt(10); (rd+"_"+x,1)}).reduceByKey(_+_).map(x=>(x._1.substring(2),x._2)).reduceByKey(_+_).glom.collect
res12: Array[Array[(String, Int)]] = Array(Array(), Array((hello,21)), Array())
将reduce join转换为map join。普通的join,那么肯定是要进行shuffle操作。既然是走shuffle,那么普通的join就肯定是走的是reduce join。
那么我们就先将所有相同的key对应的value汇聚到一个task中,然后再进行join。
那么这种方案适用于什么情况呢?
如果两个RDD要进行join,其中一个RDD是比较小的。比如一个RDD是100万数据,一个RDD是1万数据。我们可以将小的RDD数据作为广播变量broadcast广播出去,这样就可以缓存到每个节点(Executor)上。前提要保证有足够的内存存放广播变量。这样就从根本上解决了join操作可能导致的数据倾斜的问题。
但是当两个RDD数据量都很大的时候,就不能用这种方式了,那样很容易会导致内存溢出。所以,我们只能使用map join的方式,牺牲一点内存资源。在可行的情况下,优先这么使用。
sample采样倾斜的key单独进行join。
主要就是将发生数据倾斜的key,单独拉出来,放到一个RDD中去。就用这个原本会倾斜的key RDD跟其他的RDD单独去join一下,这个时候key对应的数据可能就会分散到多个task中去进行join操作。就不至于说是,这个key跟之前其他的key混合在一个RDD中时,肯定是会导致一个key对应的所有数据都到一个task中去,就会导致数据倾斜。
那么什么情况下适合使用这种方案呢?
优先对于join,肯定是希望能够采用方案五,针对与我们的RDD的数据,我们可以把它转换成一个中间表,或者是直接用countByKey的方式,查看这个RDD各个key对应的数据量。
此时如果你发现整个RDD就一个或者少数几个key对应的数据量特别多。此时建议采用这种方案,单拉出来那个最多的key,单独进行join,尽可能地将key分散到各个task上去进行join操作。
如果你发现整个RDD中有多个key对应的数据量都特别多,此时,只能将数据量多的key拉取出来,然后进行一个优化操作。从另外一个要join的表中,也过滤出来一份数据,比如某个key可能就只有一条数据。
然后我们再对那个只有一条数据的RDD,进行flatMap操作,打上100个随机数,作为前缀,返回100条数据。
然后再将刚刚拉取出来的key对应的数据量特别多的RDD,给每一条数据,都打上一个100以内的随机数,作为前缀。然后就可以进行join操作了,join完以后,执行map操作将之前打上的随机数给去掉,然后再和另外一个普通RDD join以后的结果再进行union操作。
使用随机数以及扩容表进行join。
这个方案是没办法彻底解决数据倾斜的,只是一种对数据倾斜的缓解。
1、选择一个RDD,要用flatMap,进行扩容,将每条数据,映射为多条数据,每个映射出来的数据,都带了一个n以内的随机数,通常来说会选择10。
2、将另外一个RDD,做普通的map映射操作,每条数据都打上一个10以内的随机数。
3、最后将两个处理后的RDD进行join操作。
4、因为两个RDD都很大,所以你没有办法去将某一个RDD扩的特别大,一般就是10倍。
5、如果就是10倍的话,那么数据倾斜问题的确是只能说是缓解和减轻,不能说彻底解决。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。