赞
踩
一.org.apache.spark.shuffle.FetchFailedException
1.问题描述
这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时。
2.报错提示
(1) missing output location
(2) shuffle fetch faild
当前的配置为每个executor使用1cpu,5GRAM,启动了20个executor
3.解决方案
一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。
启动的execuote数量为:7个
每个executor的配置:
消耗的内存资源为:105G RAM
可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就结束了。
二.Executor&Task Lost
1.问题描述
因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈
2.报错提示
(1) executor lost
(2) task lost
(3) 各种timeout
3.解决方案
提高 spark.network.timeout 的值,根据情况改成300(5min)或更高。
默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性
三.倾斜
1.问题描述
大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢。
分为数据倾斜和task倾斜两种。
2.错误提示
(1) 数据倾斜
(2) 任务倾斜
差距不大的几个task,有的运行速度特别慢。
3.解决方案
(1) 数据倾斜
数据倾斜大多数情况是由于大量null值或者""引起,在计算前过滤掉这些数据既可。
例如:
(2) 任务倾斜
task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。
或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。
spark.speculation true
spark.speculation.interval 100 - 检测周期,单位毫秒;
spark.speculation.quantile 0.75 - 完成task的百分比时启动推测
spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。
四.OOM(内存溢出)
1.问题描述
内存不够,数据太多就会抛出OOM的Exeception
因为报错提示很明显,这里就不给报错提示了。。。
2.解决方案
主要有driver OOM和executor OOM两种
(1) driver OOM
一般是使用了collect操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可。
(2) executor OOM
1.可以按下面的内存优化的方法增加code使用内存空间
2.增加executor内存总量,也就是说增加spark.executor.memory的值
3.增加任务并行度(大任务就被分成小任务了),参考下面优化并行度的方法
优化
1.内存
当然如果你的任务shuffle量特别大,同时rdd缓存比较少可以更改下面的参数进一步提高任务运行速度。
spark.storage.memoryFraction - 分配给rdd缓存的比例,默认为0.6(60%),如果缓存的数据较少可以降低该值。
spark.shuffle.memoryFraction - 分配给shuffle数据的内存比例,默认为0.2(20%)
剩下的20%内存空间则是分配给代码生成对象等。
如果任务运行缓慢,jvm进行频繁gc或者内存空间不足,或者可以降低上述的两个值。
"spark.rdd.compress","true" - 默认为false,压缩序列化的RDD分区,消耗一些cpu减少空间的使用
如果数据只使用一次,不要采用cache操作,因为并不会提高运行速度,还会造成内存浪费。
2.并行度
发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会造成很多小任务,增加启动任务的开销,太小,运行大数据量的任务时速度缓慢。
sql聚合操作(发生shuffle)时的并行度,默认为200,如果任务运行缓慢增加这个值。
相同的两个任务:
速度变快主要是大量的减少了gc的时间。
修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)来操作。
========================================================================================
五、踩坑SPARK之容错机制
2018-04-15 14
SPARK的容错
这块机制其实还不是太明白,很多都是看的这位兄弟的博客,这里说说今天遇到的问题以及踩到的坑。
问题
最近在调一个spark程序,因为数据量太大,存在一些性能障碍。之前join的问题已经解决(过两天把这个方案补上)。一直以为这样就解决问题了,但通过新数据的测试,发现耗时仍然可能特别严重。一个问题是几个任务的GC时间过长,导致整体运行时间特别长(这个问题没有得到复现,如果再次遇到的话,目前只能通过一些已有GC方案解决);另一个问题是,即使没有大的GC耗时,计算时间依旧很感人(一个任务大约要4~5h,是不大能接受的)。
为了加速任务,在队列资源不是特别紧张的前提下,我决定加一些机器。具体做法就是加num-executors, executor-memory以及executor-cores,然后把default-parallelism增大。一开始观察,速度的确有加快的样子,估计可以提速一倍(也是应该的,毕竟资源也加了一倍左右)。但是当任务跑到1/4的时候,突然出现了一个意外:一个executor突然挂了!
SPARK的任务重启
之前没有仔细研究过,executor挂了spark会怎么处理。因为毕竟已经跑了一些结果了,总不能从头开始再重跑一遍吧。
首先我们来看一下driver日志报的问题:
FetchFailed(BlockManagerId(301, some_port, 7391, None), shuffleId=4, mapId=69, reduceId=1579, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to some_port
原来是某个exetutor想要fetch数据(应该是shuffle read),但那个有数据的executor挂了,导致fetch失败。为啥我知道executor挂了?我是通过spark-ui看到的。
我们想一下,那个executor挂了的话,有什么后果?那个executor上存着上个stage算好的数据,然后这个stage的任务会依赖那些数据,所以这个会影响到很多这个stage的任务。
下面分三个角度看这个stage的任务:这个stage已经算好的任务,应该是不需要重新计算的;这个stage未启动的任务暂时不受到影响;这个stage已经启动但未完成的任务是什么影响呢?这个我们稍后再说。
先看spark在知道executor挂了之后做了些什么事情?假设我们当前的stage是9号stage,默认叫做9.0;现在因为那个executor挂了,这个stage不能顺利继续下去了。所以,spark就重启一个新的stage,叫做9.1。由于已经算好的就不要算了,所以任务数量就是之前的总量减去已经计算完成的数量。对于9.0已经启动但未完成的任务,9.1仍然会重启,但似乎二者之前没有进行沟通。
下面来看,那个executor已经算好的数据现在丢失了,spark要怎么做?由于spark的rdd之间的“血缘”关系,可以根据那个executor上rdd的生成方法,再重新算一遍就好了。这个只涉及到那个executor上的数据,所以开销会很小,但有可能会重新计算多个stage(我遇到的是分钟级别的重新计算)。
讲道理,有了这个容错重启任务的机制,分钟级别的重算不会带来很大的额外时间开销的。但通过spark ui观察,在那个9.0的时候,并行的任务有近1000个,现在9.1的并行任务只剩下300~400个,速度变得很慢,加了资源就跟没加一样,这怎么能忍?
回头看一下executor的log,积累了一段时间发现,很多executor一直在报这个:
java.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:09 ERROR OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:09 ERROR OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:19 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.18/04/15 08:17:26 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.18/04/15 08:17:26 ERROR RetryingBlockFetcher: Exception while beginning fetch of 20 outstanding blocks (after 1 retries)
闲着无聊的我,就这样看了两个小时,这个错才消停。仔细观察,好像这个在尝试30次fetch数据。不过fetch的数据源就是那个已经挂了的executor,既然已经挂了,还一直在那儿尝试,不是有毛病嘛。
另外一个问题就是,为毛要重试30次这么多?感觉这应该是个配置,那就在spark ui的environment的tab里面搜一搜30。哈哈,果然有个30个在那儿:
spark.shuffle.io.maxRetries: 30
看着名字,应该就是它了!spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!然后还有一个对应的参数:spark.shuffle.io.retryWait=10s,这个表示两次retry之间的间隔。知道这个问题后,查了一下文档,发现官方默认的retry次数是3次,不知道哪个运维把默认参数改成了30!还应该挨千刀的是,retryWait也从默认的5s改成了10s。跑得慢的原因很明显了,就是这两个参数,导致很多executor在进行无谓的挣扎,想要从一个挂了的executor上取数,也就是两个小时,一半以上的executor的资源都浪费了。
但是稍等,讲道理30次乘以10s,最多也就浪费300s,也就是5min,怎么会浪费2h呢?这里我猜想:
18/04/15 08:15:19 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.
这里应该是一个提示,当我发现那个executor没法连接上的时候,就想着重新建立一个连接。但毕竟那个节点已经挂了,必然一直没有回应,那就需要等待连接超时。连接超时时间很长,比如是5min,那算下来这个时间也就差不多要两个小时了。
SPARK 2.1.0的坑
那么问题又来了,spark怎么这么傻?明明那个exector挂了,还是要做尝试。难道driver不能告知每个executor:那个挂了,不要去那里取数了,已经起了的任务就结束吧。通过查询多方资料才知道,原来早先设计的人好像没有考虑到这一点。下面是一些jira和github的issue,都是在吐槽这个问题:
https://issues.apache.org/jira/browse/SPARK-20178https://issues.apache.org/jira/browse/SPARK-20230https://github.com/apache/spark/pull/17088
看样子是17年5月才close掉这个问题,所以可能要到spark 2.3.0里面才会修复这个问题。也算是踩了一次2.1.0版本spark的坑,不过公司里spark是不能随意升级的,以后还是需要手动处理这个问题,比如自己设置retry次数。不过一个可能的坑是,这个参数的描述是:
This retry logic helps stabilize large shuffles in the face of long GC pauses or transient network connectivity issues.
所以GC如果是个问题的话,可能还要往上调。
六、Spark性能调优之合理设置并行度
1.Spark的并行度指的是什么?
spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度!
当分配完所能分配的最大资源了,然后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你分配下去的资源都浪费掉了。同时并行运行,还可以让每个task要处理的数量变少(很简单的原理。合理设置并行度,可以充分利用集群资源,减少每个task处理数据量,而增加性能加快运行速度。
举例:
假如, 现在已经在spark-submit 脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor ,每个executor 有10G内存,每个executor有3个cpu core 。 基本已经达到了集群或者yarn队列的资源上限。
task没有设置,或者设置的很少,比如就设置了,100个task 。 50个executor ,每个executor 有3个core ,也就是说
Application 任何一个stage运行的时候,都有总数150个cpu core ,可以并行运行。但是,你现在只有100个task ,平均分配一下,每个executor 分配到2个task,ok,那么同时在运行的task,只有100个task,每个executor 只会并行运行 2个task。 每个executor 剩下的一个cpu core 就浪费掉了!你的资源,虽然分配充足了,但是问题是, 并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该要设置的足够大,大到可以完全合理的利用你的集群资源; 比如上面的例子,总共集群有150个cpu core ,可以并行运行150个task。那么你就应该将你的Application 的并行度,至少设置成150个,才能完全有效的利用你的集群资源,让150个task ,并行执行,而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数量变少; 比如总共 150G 的数据要处理, 如果是100个task ,每个task 要计算1.5G的数据。 现在增加到150个task,每个task只要处理1G数据。
2.如何去提高并行度?
1、task数量,至少设置成与spark Application 的总cpu core 数量相同(最理性情况,150个core,分配150task,一起运行,差不多同一时间运行完毕)官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300~ 500. 与理性情况不同的,有些task 会运行快一点,比如50s 就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会导致资源的浪费,因为 比如150task ,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个cpu core空闲出来了,导致浪费。如果设置2~3倍,那么一个task运行完以后,另外一个task马上补上来,尽量让cpu core不要空闲。同时尽量提升spark运行效率和速度。提升性能。
2、如何设置一个Spark Application的并行度?
spark.defalut.parallelism 默认是没有值的,如果设置了值比如说10,是在shuffle的过程才会起作用(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分区数就是10,rdd1的分区数不受这个参数的影响)
new SparkConf().set(“spark.defalut.parallelism”,”“500)
3、如果读取的数据在HDFS上,增加block数,默认情况下split与block是一对一的,而split又与RDD中的partition对应,所以增加了block数,也就提高了并行度。
4、RDD.repartition,给RDD重新设置partition的数量
5、reduceByKey的算子指定partition的数量
val rdd2 = rdd1.reduceByKey(_+_,10) val rdd3 = rdd2.map.filter.reduceByKey(_+_)
6、val rdd3 = rdd1.join(rdd2) rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量。
7、spark.sql.shuffle.partitions //spark sql中shuffle过程中partitions的数量
七、自己总结:
1、项目当中遇到的问题以及解决方式:
一开始设置的查询mysql的分割并行查询是8,结果就遇到了程序反复移除和重新生成executor,但是任务不能完成,一个晚上还没有跑完。后来在clouder manager里面查看yarn的监控。发现其每个nodemanager的分配的executor非常不均匀。然后计算了一下executor的总数是8个。
最终才把mysql的查询并行设置成30(结果很快任务就跑完了,估计是并行度增加每个executor分到的数据少了,没有gc了,也就不存在反复重试,重试不行还重启executor)。然后再yarn里面看到是启动了30个container。有15个是同时运行的。就是一个container(包含executor)才一个cpu核心,对应的每个executor的内存也没有我设置的大。
请参照下面这个图。这个是我的设置,说明它没有参照我的设置进行资源分配,而是根据查询mysql的并行度进行了资源分配。估计是根据mysql的并行度,产生任务、数据分区,然后是计算。我原来以为是按照默认的并行度64进行的;所以一直没有设置并行度(spark.default.parallelism)这个参数。万万没想到它是根据mysql的查询并行度进行并行的。
2、出现问题的原理分析
综合二.Executor&Task Lost 、 五、踩坑SPARK之容错机制
可以总结如下:
A、当数据集超大时(或者是分配不均匀或者分区太少、并行度不够等导致的单个executor内存不够),会造成executor内存不够,频繁gc。
B、频繁的gc或者网络抖动,会造成数据传输超时、心跳超时等问题。
C、由于spark的重试机制,会先根据配置的时间间隔,再次去重试拉取数据。
D、超过重试次数之后,executor会被干掉,重新生成一个executor去重新执行。这样就导致了反复的remove掉executor,然后重新生成。但是任务还是不能完成。
3、对spark的重试机制的参数进行设置(尝试次数、尝试间隔、还有各种通信超时时间)
每次尝试失败都是要等到通信超时,各种时间加起来,反复重试时间会很长
A、
spark.shuffle.io.maxRetries: 30 #尝试次数
spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!
B、
spark.shuffle.io.retryWait=10s #这个表示两次retry之间的间隔。
C、
spark.network.timeout=300 #配置所有网络传输的延时
如果没有主动设置以下参数,默认覆盖其属性
【转载】:https://www.jianshu.com/p/a0c38dc46b89
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。