赞
踩
首先,要搞清楚 Spark 的几个基本概念和原则,否则系统的性能调优无从谈起:
每一台 host 上面可以并行 N 个 worker,每一个 worker 下面可以并行 M 个 executor,task 们会被分配到 executor 上面去执行。stage 指的是一组并行运行的 task,stage 内部是不能出现 shuffle 的,因为 shuffle 就像篱笆一样阻止了并行 task 的运行,遇到 shuffle 就意味着到了 stage 的边界。
CPU 的 core 数量,每个 executor 可以占用一个或多个 core,可以通过观察 CPU 的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个 executor 占用了多个 core,但是总的 CPU 使用率却不高(因为一个 executor 并不总能充分利用多核的能力),这个时候可以考虑让一个 executor 占用更少的 core,同时 worker 下面增加更多的 executor,或者一台 host 上面增加更多的 worker 来增加并行执行的 executor 的数量,从而增加 CPU 利用率。但是增加 executor 的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的 executor,每个 executor 的内存就越小,以致出现过多的数据 spill over 甚至 out of memory 的情况。
partition 和 parallelism,partition 指的就是数据分片的数量,每一次 task 只能处理一个 partition 的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多 executor 的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行 action 类型操作的时候(比如各种 reduce 操作),partition 的数量会选择 parent RDD 中最大的那一个。而 parallelism 则指的是在 RDD 进行 reduce 类操作的时候,默认返回数据的 paritition 数量(而在进行 map 类操作的时候,partition 数量通常取自 parent RDD 中较大的一个,而且也不会涉及 shuffle,因此这个 parallelism 的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism
可以设置默认的分片数量,而很多 RDD 的操作都可以指定一个 partition 参数来显式控制具体的分片数量。
看这样几个例子:
(1)实践中跑的 Spark job,有的特别慢,查看 CPU 利用率很低,可以尝试减少每个 executor 占用 CPU core 的数量,增加并行的 executor 数量,同时配合增加分片,整体上增加了 CPU 的利用率,加快数据处理速度。
(2)发现某 job 很容易发生内存溢出,我们就增大分片数量,从而减少了每片数据的规模,同时还减少并行的 executor 数量,这样相同的内存资源分配给数量更少的 executor,相当于增加了每个 task 的内存分配,这样运行速度可能慢了些,但是总比 OOM 强。
(3)数据量特别少,有大量的小文件生成,就减少文件分片,没必要创建那么多 task,这种情况,如果只是最原始的 input 比较小,一般都能被注意到;但是,如果是在运算过程中,比如应用某个 reduceBy 或者某个 filter 以后,数据大量减少,这种低效情况就很少被留意到。
最后再补充一点,随着参数和配置的变化,性能的瓶颈是变化的
,在分析问题的时候不要忘记。例如在每台机器上部署的 executor 数量增加的时候,性能一开始是增加的,同时也观察到 CPU 的平均使用率在增加;但是随着单台机器上的 executor 越来越多,性能下降了,因为随着 executor 的数量增加,被分配到每个 executor 的内存数量减小,在内存里直接操作的越来越少,spill over 到磁盘上的数据越来越多,自然性能就变差了。
下面给这样一个直观的例子,当前总的 cpu 利用率并不高:
但是经过根据上述原则的的调整之后,可以显著发现 cpu 总利用率增加了:
其次,涉及性能调优我们经常要改配置,在 Spark 里面有三种常见的配置方式,虽然有些参数的配置是可以互相替代,但是作为最佳实践,还是需要遵循不同的情形下使用不同的配置:
1、设置环境变量,这种方式主要用于和环境、硬件相关的配置;
2、命令行参数,这种方式主要用于不同次的运行会发生变化的参数,用双横线开头;
3、代码里面(比如 Scala)显式设置(SparkConf 对象),这种配置通常是 application 级别的配置,一般不改变。
举一个配置的具体例子。slave、worker 和 executor 之间的比例调整。我们经常需要调整并行的 executor 的数量,那么简单说有两种方式:
• 1、每个 worker 内始终跑一个 executor,但是调整单台 slave 上并行的 worker 的数量。比如,SPARK_WORKER_INSTANCES
可以设置每个 slave 的 worker 的数量,但是在改变这个参数的时候,比如改成 2,一定要相应设置SPARK_WORKER_CORES
的值,让每个 worker 使用原有一半的 core,这样才能让两个 worker 一同工作;
• 2、每台 slave 内始终只部署一个 worker,但是 worker 内部署多个 executor。我们是在 YARN 框架下采用这个调整来实现 executor 数量改变的,一种典型办法是,一个 host 只跑一个 worker,然后配置spark.executor.cores
为 host 上 CPU core 的 N 分之一,同时也设置spark.executor.memory
为 host 上分配给 Spark 计算内存的 N 分之一,这样这个 host 上就能够启动 N 个 executor。
有的配置在不同的 MR 框架/工具下是不一样的,比如 YARN 下有的参数的默认取值就不同,这点需要注意。
明确这些基础的事情以后,再来一项一项看性能调优的要点。
Spark Web UI
Spark 提供了一些基本的 Web 监控页面,对于日常监控十分有用。
通过 http://hadoop102:4040(默认端口是 4040,可以通过 spark.ui.port
修改)我们可以获得运行中的程序信息,如下:
(1)stages 和 tasks 调度情况;
(2)RDD 大小及内存使用;
(3)系统环境信息;
(4)正在执行的 executor 信息。
如果想当 Spark 应用退出后,仍可以获得历史 Spark 应用的 stages 和 tasks 执行信息,便于分析程序不明原因挂掉的情况。可以开启 History Server。配置方法如下:
(1)$SPARK_HOME/conf/spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50
Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory"
说明:
spark.history.retainedApplica-tions #仅显示最近50个应用
spark.history.fs.logDirectory #Spark History Server 页面只展示该路径下的信息
(2)$SPARK_HOME/conf/spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:9000/directory #应用在运行过程中所有的信息均记录在该属性指定的路径下
spark.eventLog.compress true
(3)HistoryServer 启动
$SPARK_HOMR/bin/start-histrory-server.sh
(4)HistoryServer 停止
$SPARK_HOMR/bin/stop-histrory-server.sh
同时 Executor 的 logs 也是查看的一个出处:
• Standalone 模式:$SPARK_HOME/logs
• YARN 模式:在 yarn-site.xml 文件中配置了 YARN 日志的存放位置:yarn.nodemanager.log-dirs
,或使用命令获取 yarn logs -applicationId
。
同时通过配置 ganglia,可以分析集群的使用状况和资源瓶颈,但是默认情况下 ganglia 是未被打包的,需要在 mvn 编译时添加 -Pspark-ganglia-lgpl
,并修改配置文件 $SPARK_HOME/conf/metrics.properties
。
参考文章链接:https://www.cnblogs.com/chenmingjun/p/10745505.html#_label1_4
其他监控工具
• Nmon(http://nmon.sourceforge.net/pmwiki.php)
Nmon:输入,c:CPU ,n:网络 ,m:内存 ,d:磁盘
• Jmeter(http://jmeter.apache.org/)
通常使用 Jmeter 做系统性能参数的实时展示,JMeter 的安装非常简单,从官方网站上下载,解压之后即可使用。运行命令在 %JMETER_HOME%/bin
下,对于 Windows 用户,直接使用 jmeter.bat 即可。
启动 jmeter:创建测试计划,设置线程组设置循环次数。
添加监听器:jp@gc - PerfMon Metrics Collector
。
设置监听器:监听主机端口及监听内容,例如 CPU。
启动监听:可以实时获得节点的 CPU 状态信息,从下图可看出 CPU 已出现瓶颈。
• Jprofiler(http://www.ej-technologies.com/products/jprofiler/overview.html)
JProfiler 是一个全功能的 Java 剖析工具(profiler),专用于分析 J2SE 和 J2EE 应用程式。它把 CPU、线程和内存的剖析组合在一个强大的应用中。JProfiler 的 GUI 可以更方便地找到性能瓶颈、抓住内存泄漏(memory leaks),并解决多线程的问题。例如分析哪个对象占用的内存比较多;哪个方法占用较大的 CPU 资源等;我们通常使用 Jprofiler 来监控 Spark 应用在 local 模式下运行时的性能瓶颈和内存泄漏情况。
内存调整要点
Memory Tuning,Java 对象会占用原始数据 2~5 倍甚至更多的空间。
最好的检测对象内存消耗的办法就是创建 RDD,然后放到 cache 里面去,然后在 UI 上面看 storage 的变化。使用-XX:+UseCompressedOops
选项可以压缩指针(8 字节变成 4 字节)。在调用 collect 等 API 的时候也要小心–大块数据往内存拷贝的时候心里要清楚。内存要留一些给操作系统,比如 20%,这里面也包括了 OS 的 buffercache,如果预留得太少了,会见到这样的错误:
“Required executor memory (235520+23552 MB) is above the max threshold (241664MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.
或者干脆就没有这样的错误,但是依然有因为内存不足导致的问题,有的会有警告,比如这个:
“16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
有的时候连这样的日志都见不到,而是见到一些不清楚原因的 executor 丢失信息:
“Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 17.0 failed 4 times, most recent failure: Lost task 12.3 in stage 17.0 (TID 1257, ip-10-184-192-56.ec2.internal): ExecutorLostFailure (executor 79 lost)
Reduce Task 的内存使用。
在某些情况下 reduce task 特别消耗内存,比如当 shuffle 出现的时候,比如 sortByKey、groupByKey、reduceByKey 和 join 等,要在内存里面建立一个巨大的 hash table。其中一个解决办法是增大 level of parallelism,这样每个 task 的输入规模就相应减小。另外,注意 shuffle 的内存上限设置,有时候有足够的内存,但是 shuffle 内存不够的话,性能也是上不去的。我们在有大量数据 join 等操作的时候,shuffle 的内存上限经常配置到 executor 的 50%。
注意原始 input 的大小,有很多操作始终都是需要某类全集数据在内存里面完成的,那么并非拼命增加 parallelism 和 partition 的值就可以把内存占用减得非常小的。我们遇到过某些性能低下甚至 OOM 的问题,是改变这两个参数所难以缓解的。但是可以通过增加每台机器的内存,或者增加机器的数量都可以直接或间接增加内存总量来解决。
另外,有一些 RDD 的 API,比如 cache、persist,都会把数据强制放到内存里面,如果并不明确这样做带来的好处,就不要用它们。
内存优化有三个方面的考虑:对象所占用的内存、访问对象的消耗以及垃圾回收所占用的开销。
1、对象所占内存,优化数据结构
Spark 默认使用 Java 序列化对象,虽然 Java 对象的访问速度更快,但其占用的空间通常比其内部的属性数据大2-5倍。为了减少内存的使用,减少 Java 序列化后的额外开销,下面列举一些 Spark 官网提供的方法。
(1)使用对象数组以及原始类型(primitive type)数组以替代 Java 或者 Scala 集合类(collection class)。fastutil 库为原始数据类型提供了非常方便的集合类,且兼容 Java 标准类库。
(2)尽可能地避免采用含有指针的嵌套数据结构来保存小对象。
(3)考虑采用数字 ID 或者枚举类型以便替代 String 类型的主键。
(4)如果内存少于 32GB,设置 JVM 参数-XX:+UseCom-pressedOops
以便将 8 字节指针修改成 4 字节。与此同时,在 Java 7 或者更高版本,设置 JVM 参数-XX:+UseCompressedStrings
以便采用 8 比特来编码每一个 ASCII 字符。
2、内存回收
(1)获取内存统计信息:优化内存前需要了解集群的内存回收频率、内存回收耗费时间等信息,可以在spark-env.sh
中设置SPARK_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps $ SPARK_JAVA_OPTS"
来获取每一次内存回收的信息。
(2)优化缓存大小:默认情况 Spark 采用运行内存(spark.executor.memory)的 60% 来进行 RDD 缓存。这表明在任务执行期间,有 40% 的内存可以用来进行对象创建。如果任务运行速度变慢且 JVM 频繁进行内存回收,或者内存空间不足,那么降低缓存大小设置可以减少内存消耗,可以降低spark.storage.memoryFraction
的大小。
3、频繁 GC 或者 OOM
针对这种情况,首先要确定现象是发生在 Driver 端还是在 Executor 端,然后在分别处理。
Driver 端:通常由于计算过大的结果集被回收到 Driver 端导致,需要调大 Driver 端的内存解决,或者进一步减少结果集的数量。
Executor 端:
(1)以外部数据作为输入的 Stage:这类 Stage 中出现 GC 通常是因为在 Map 侧进行 map-side-combine 时,由于 group 过多引起的。解决方法可以增加 partition 的数量(即 task 的数量)来减少每个 task 要处理的数据,来减少 GC 的可能性。
(2)以 shuffle 作为输入的 Stage:这类 Stage 中出现 GC 的通常原因也是和 shuffle 有关,常见原因是某一个或多个 group 的数据过多,也就是所谓的数据倾斜,最简单的办法就是增加 shuffle 的 task 数量,比如在 SparkSQL 中设置SET spark.sql.shuffle.partitions=400
,如果调大 shuffle 的 task 无法解决问题,说明你的数据倾斜很严重,某一个 group 的数据远远大于其他的 group,需要你在业务逻辑上进行调整,预先针对较大的 group 做单独处理。
集群并行度调整要点
在 Spark 集群环境下,只有足够高的并行度才能使系统资源得到充分的利用,可以通过修改 spark-env.sh 来调整 Executor 的数量和使用资源,Standalone 和 YARN 方式资源的调度管理是不同的。
在 Standalone 模式下:
(1)每个节点使用的最大内存数:SPARK_WORKER_INSTANCES * SPARK_WORKER_MEMORY
(2)每个节点的最大并发 task 数:SPARK_WORKER_INSTANCES * SPARK_WORKER_CORES
在YARN模式下:
(1)集群 task 并行度:SPARK_ EXECUTOR_INSTANCES * SPARK_EXECUTOR_CORES
(2)集群内存总量:(executor 个数) * (SPARK_EXECUTOR_MEMORY + spark.yarn.executor.memoryOverhead) + (SPARK_DRIVER_MEMORY + spark.yarn.driver.memoryOverhead)。
重点强调:Spark 对 Executor 和 Driver 额外添加堆内存大小
:
Executor 端:由 spark.yarn.executor.memoryOverhead 设置,默认值 executorMemory * 0.07 与 384 的最大值。
Driver 端:由 spark.yarn.driver.memoryOverhead 设置,默认值 driverMemory * 0.07 与 384 的最大值。
通过调整上述参数,可以提高集群并行度,让系统同时执行的任务更多,那么对于相同的任务,并行度高了,可以减少轮询次数。举例说明:如果一个 stage 有 100task,并行度为 50,那么执行完这次任务,需要轮询两次才能完成,如果并行度为 100,那么一次就可以了。
但是在资源相同的情况,并行度高了,相应的 Executor 内存就会减少,所以需要根据实际请况协调内存和 core。此外,Spark 能够非常有效的支持短时间任务(例如:200ms),因为会对所有的任务复用 JVM,这样能减小任务启动的消耗,Standalone 模式下,core 可以允许 1-2 倍于物理 core 的数量进行超配。
Level of Parallelism
。指定它以后,在进行 reduce 类型操作的时候,默认 partition 的数量就被指定了。这个参数在实际工程中通常是必不可少的,一般都要根据 input 和每个 executor 内存的大小来确定。设置level of parallelism
或者属性spark.default.parallelism
来改变并行级别,通常来说,每一个 CPU 核可以分配 2~3 个 task。
CPU core 的访问模式是共享还是独占
。即 CPU 核是被同一 host 上的 executor 共享还是瓜分并独占。比如,一台机器上共有 32 个 CPU core 的资源,同时部署了两个 executor,总内存是 50G,那么一种方式是配置spark.executor.cores
为 16,spark.executor.memory
为 20G,这样由于内存的限制,这台机器上会部署两个 executor,每个都使用 20G 内存,并且各使用 “独占” 的 16 个 CPU core 资源;而在内存资源不变的前提下,也可以让这两个 executor “共享” 这 32 个 core。根据测试,独占模式的性能要略好与共享模式。
GC调优
。打印 GC 信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
。要记得默认 60% 的 executor 内存可以被用来作为 RDD 的缓存,因此只有 40% 的内存可以被用来作为对象创建的空间,这一点可以通过设置spark.storage.memoryFraction
改变。如果有很多小对象创建,但是这些对象在不完全 GC 的过程中就可以回收,那么增大 Eden 区会有一定帮助。如果有任务从 HDFS 拷贝数据,内存消耗有一个简单的估算公式–比如 HDFS 的 block size 是 64MB,工作区内有 4 个 task 拷贝数据,而解压缩一个 block 要增大 3 倍大小,那么估算内存消耗就是:4364MB。另外,还有一种情况:GC 默认情况下有一个限制,默认是 GC 时间不能超过 2% 的 CPU 时间,但是如果大量对象创建(在 Spark 里很容易出现,代码模式就是一个 RDD 转下一个 RDD),就会导致大量的 GC 时间,从而出现 “OutOfMemoryError: GC overhead limit exceeded”,对于这个,可以通过设置-XX:-UseGCOverheadLimit
关掉它。
序列化和传输
Data Serialization,默认使用的是 Java Serialization,这个程序员最熟悉,但是性能、空间表现都比较差
。还有一个选项是 Kryo Serialization,更快,压缩率也更高,但是并非支持任意类的序列化。在 Spark UI 上能够看到序列化占用总时间开销的比例,如果这个比例高的话可以考虑优化内存使用和序列化。
Broadcasting Large Variables
。在 task 使用静态大对象的时候,可以把它 broadcast 出去。Spark 会打印序列化后的大小,通常来说如果它超过 20KB 就值得这么做。有一种常见情形是,一个大表 join 一个小表,把小表 broadcast 后,大表的数据就不需要在各个 node 之间疯跑,安安静静地呆在本地等小表 broadcast 过来就好了。
Data Locality
。数据和代码要放到一起才能处理,通常代码总比数据要小一些,因此把代码送到各处会更快。Data Locality 是数据和处理的代码在屋里空间上接近的程度:PROCESS_LOCAL(同一个 JVM)、NODE_LOCAL(同一个 node,比如数据在 HDFS 上,但是和代码在同一个 node)、NO_PREF、RACK_LOCAL(不在同一个 server,但在同一个机架)、ANY。当然优先级从高到低,但是如果在空闲的 executor 上面没有未处理数据了,那么就有两个选择:
(1)要么等如今繁忙的 CPU 闲下来处理尽可能“本地”的数据,
(2)要么就不等直接启动 task 去处理相对远程的数据。
默认当这种情况发生 Spark 会等一会儿(spark.locality),即策略(1),如果繁忙的 CPU 停不下来,就会执行策略(2)。
代码里对大对象的引用
。在 task 里面引用大对象的时候要小心,因为它会随着 task 序列化到每个节点上去,引发性能问题。只要序列化的过程不抛出异常,引用对象序列化的问题事实上很少被人重视。如果,这个大对象确实是需要的,那么就不如干脆把它变成 RDD 好了。绝大多数时候,对于大对象的序列化行为,是不知不觉发生的,或者说是预期之外的,比如在我们的项目中有这样一段代码:
rdd.map(r => {
println(BackfillTypeIndex)
})
其实呢,它等价于这样:
rdd.map(r => {
println(this.BackfillTypeIndex)
})
不要小看了这个 this,有时候它的序列化是非常大的开销。
对于这样的问题,一种最直接的解决方法就是:
val dereferencedVariable = this.BackfillTypeIndex
rdd.map(r => println(dereferencedVariable)) // "this" is not serialized
相关地,注解 @transient 用来标识某变量不要被序列化
,这对于将大对象从序列化的陷阱中排除掉是很有用的。另外,注意 class 之间的继承层级关系,有时候一个小的 case class 可能来自一棵大树。
文件读写
文件存储和读取的优化
。比如对于一些 case 而言,如果只需要某几列,使用 rcfile 和 parquet 这样的格式会大大减少文件读取成本。再有就是存储文件到 S3 上或者 HDFS 上,可以根据情况选择更合适的格式,比如压缩率更高的格式
。另外,特别是对于 shuffle 特别多的情况,考虑留下一定量的额外内存给操作系统作为操作系统的 buffer cache
,比如总共 50G 的内存,JVM 最多分配到 40G 多一点。
文件分片
。比如在 S3 上面就支持文件以分片形式存放,后缀是 partXX。使用 coalesce 方法来设置分成多少片,这个调整成并行级别或者其整数倍可以提高读写性能。但是太高太低都不好,太低了没法充分利用 S3 并行读写的能力,太高了则是小文件太多,预处理、合并、连接建立等等都是时间开销啊,读写还容易超过 throttle。
任务调整要点
Spark 的 Speculation
。通过设置spark.speculation
等几个相关选项,可以让 Spark 在发现某些 task 执行特别慢的时候,可以在不等待完成的情况下被重新执行,最后相同的 task 只要有一个执行完了,那么最快执行完的那个结果就会被采纳。
减少Shuffle
。其实 Spark 的计算往往很快,但是大量开销都花在网络和 IO 上面,而 shuffle 就是一个典型。举个例子,如果 (k, v1) join (k, v2) => (k, v3),那么,这种情况其实 Spark 是优化得非常好的,因为需要 join 的都在一个 node 的一个 partition 里面,join 很快完成,结果也是在同一个 node(这一系列操作可以被放在同一个 stage 里面)。但是如果数据结构被设计为 (obj1) join (obj2) => (obj3),而其中的 join 条件为 obj1.column1 == obj2.column1,这个时候往往就被迫 shuffle 了,因为不再有同一个 key 使得数据在同一个 node 上的强保证。在一定要 shuffle 的情况下,尽可能减少 shuffle 前的数据规模,比如这个避免 groupByKey 的例子。下面这个比较的图片来自 Spark Summit 2013 的一个演讲,讲的是同一件事情:
Repartition
。运算过程中数据量时大时小,选择合适的 partition 数量关系重大,如果太多 partition 就导致有很多小任务和空任务产生;如果太少则导致运算资源没法充分利用,必要时候可以使用 repartition 来调整,不过它也不是没有代价的,其中一个最主要代价就是 shuffle。再有一个常见问题是数据大小差异太大,这种情况主要是数据的 partition 的 key 其实取值并不均匀造成的(默认使用 HashPartitioner),需要改进这一点,比如重写 hash 算法。测试的时候想知道 partition 的数量可以调用rdd.partitions().size()
获知。
Task时间分布
。关注 Spark UI,在 Stage 的详情页面上,可以看得到 shuffle 写的总开销,GC 时间,当前方法栈,还有 task 的时间花费。如果你发现 task 的时间花费分布太散,就是说有的花费时间很长,有的很短,这就说明计算分布不均,需要重新审视数据分片、key 的 hash、task 内部的计算逻辑等等,瓶颈出现在耗时长的 task 上面。
重用资源
。有的资源申请开销巨大,而且往往相当有限,比如建立连接,可以考虑在 partition 建立的时候就创建好(比如使用 mapPartition 方法),这样对于每个 partition 内的每个元素的操作,就只要重用这个连接就好了,不需要重新建立连接。
同时 Spark 的任务数量是由 stage 中的起始的所有 RDD 的 partition 之和数量决定,所以需要了解每个 RDD 的 partition 的计算方法。以 Spark 应用从 HDFS 读取数据为例,HadoopRDD 的 partition 切分方法完全继承于 MapReduce 中的 FileInputFormat,具体的 partition 数量由 HDFS 的块大小、mapred.min.split.size 的大小、文件的压缩方式等多个因素决定,详情需要参见 FileInputFormat 的代码。
开启推测机制
推测机制后,如果集群中,某一台机器的几个 task 特别慢,推测机制会将任务分配到其他机器执行,最后 Spark 会选取最快的作为最终结果。在 spark-default.conf 中添加:
spark.speculation true
。
推测机制与以下几个参数有关:
(1)spark.speculation.interval 100 #检测周期,单位毫秒
(2)spark.speculation.quantile 0.75 #完成 task 的百分比时启动推测
(3)spark.speculation.multiplier 1.5 #比其他的慢多少倍时启动推测
**什么是数据倾斜?**对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。
何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
如果数据倾斜没有解决,完全没有可能进行性能调优,其他所有的调优手段都是一个笑话。数据倾斜是最能体现一个 spark 大数据工程师水平的性能调优问题。
数据倾斜如果能够解决的话,代表对 spark 运行机制了如指掌。数据倾斜俩大直接致命后果。
(1)数据倾斜直接会导致一种情况:Out Of Memory。
(2)运行速度慢,特别慢,非常慢,极端的慢,不可接受的慢。
我们以 100 亿条数据为列子。
个别 Task(80 亿条数据的那个 Task)处理过度大量数据。导致拖慢了整个 Job 的执行时间。这可能导致该 Task 所在的机器 OOM,或者运行速度非常慢。
数据倾斜是如何造成的呢?
在 Shuffle 阶段。同样 Key 的数据条数太多了。导致了某个 key(上图中的 80 亿条)所在的 Task 数据量太大了。远远超过其他 Task 所处理的数据量。
而这样的场景太常见了。二八定律可以证实这种场景。
搞定数据倾斜需要:
(1)搞定 shuffle
(2)搞定业务场景
(3)搞定 cpu core 的使用情况
(4)搞定 OOM 的根本原因等
所以搞定了数据倾斜需要对至少以上的原理了如指掌。所以搞定数据倾斜是关键中的关键。
一个经验结论是:一般情况下,OOM 的原因都是数据倾斜。某个 task 任务数据量太大,GC 的压力就很大。这比不了 Kafka,因为 kafka 的内存是不经过 JVM 的,是基于 Linux 内核的 Page。
数据倾斜的原理很简单:在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。比如大部分 key 对应 10 条数据,但是个别 key 却对应了 100 万条数据,那么大部分 task 可能就只会分配到 10 条数据,然后 1 秒钟就运行完了;但是个别 task 可能分配到了 100 万数据,要运行一两个小时。因此,整个 Spark 作业的运行进度是由运行时间最长的那个 task 决定的。
因此出现数据倾斜的时候,Spark 作业看起来会运行得非常缓慢,甚至可能因为某个 task 处理的数据量过大导致内存溢出。
下图就是一个很清晰的例子:hello 这个 key,在三个节点上对应了总共 7 条数据,这些数据都会被拉取到同一个 task 中进行处理;而 world 和 you 这两个 key 分别才对应 1 条数据,所以另外两个 task 只要分别处理 1 条数据即可。此时第一个 task 的运行时间可能是另外两个 task 的 7 倍,而整个 stage 的运行速度也由运行最慢的那个 task 所决定。
[外链图片转存失败(img-iBF1PXvk-1562758075577)(https://s2.ax1x.com/2019/05/04/Eame8e.png)]
由于同一个 Stage 内的所有 Task 执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同 Task 之间耗时的差异主要由该 Task 所处理的数据量决定。
数据倾斜只会发生在 shuffle 过程中
。这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子
:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。
1、某个 task 执行特别慢的情况
首先要看的,就是数据倾斜发生在第几个 stage 中。
可以通过 Spark Web UI 来查看当前运行到了第几个 stage,看一下当前这个 stage 各个 task 分配的数据量,从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。
比如下图中,倒数第三列显示了每个 task 的运行时间。明显可以看到,有的 task 运行特别快,只需要几秒钟就可以运行完;而有的 task 运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个 task 处理的数据量,明显可以看到,运行时间特别短的 task 只需要处理几百 KB 的数据即可,而运行时间特别长的 task 需要处理几千 KB 的数据,处理的数据量差了 10 倍。此时更加能够确定是发生了数据倾斜。
知道数据倾斜发生在哪一个 stage 之后,接着我们就需要根据 stage 划分原理,推算出来发生倾斜的那个 stage 对应代码中的哪一部分,这部分代码中肯定会有一个 shuffle 类算子。精准推算 stage 与代码的对应关系,这里介绍一个相对简单实用的推算方法:只要看到 Spark 代码中出现了一个 shuffle 类算子或者是 Spark SQL 的 SQL 语句中出现了会导致 shuffle 的语句(比如 group by 语句),那么就可以判定,以那个地方为界限划分出了前后两个 stage。
这里我们就以 Spark 最基础的入门程序–单词计数来举例,如何用最简单的方法大致推算出一个 stage 对应的代码。如下示例,在整个代码中,只有一个 reduceByKey 是会发生 shuffle 的算子,因此就可以认为,以这个算子为界限,会划分出前后两个 stage。
stage0,主要是执行从 textFile 到 map 操作,以及执行 shuffle write 操作。shuffle write 操作,我们可以简单理解为对 pairs RDD 中的数据进行分区操作,每个 task 处理的数据中,相同的 key 会写入同一个磁盘文件内。
stage1,主要是执行从 reduceByKey 到 collect 操作,stage1 的各个 task 一开始运行,就会首先执行 shuffle read 操作。执行 shuffle read 操作的 task,会从 stage0 的各个 task 所在节点拉取属于自己处理的那些 key,然后对同一个 key 进行全局性的聚合或 join 等操作,在这里就是对 key 的 value 值进行累加。stage1 在执行完 reduceByKey 算子之后,就计算出了最终的 wordCounts RDD,然后会执行 collect 算子,将所有数据拉取到 Driver 上,供我们遍历和打印输出。
示例代码:
val conf = new SparkConf()
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_))
通过对单词计数程序的分析,希望能够让大家了解最基本的 stage 划分的原理,以及 stage 划分后 shuffle 操作是如何在两个 stage 的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的 stage 对应代码的哪一个部分了。比如我们在 Spark Web UI 或者本地 log 中发现,stage1 的某几个 task 执行得特别慢,判定 stage1 出现了数据倾斜,那么就可以回到代码中定位出 stage1 主要包括了 reduceByKey 这个 shuffle 类算子,此时基本就可以确定是由 reduceByKey 算子导致的数据倾斜问题。比如某个单词出现了 100 万次,其他单词才出现 10 次,那么 stage1 的某个 task 就要处理 100 万数据,整个 stage 的速度就会被这个 task 拖慢。
2、某个 task 莫名其妙内存溢出的情况
这种情况下去定位出问题的代码就比较容易了。我们建议直接看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能就是这个算子导致了数据倾斜。
但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的 bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运行时间以及分配的数据量
,才能确定是否是由于数据倾斜才导致了这次内存溢出。
3、查看导致数据倾斜的 key 的数据分布情况
知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的 key 分布与不同的 shuffle 算子组合起来的各种情况,可能需要选择不同的技术方案来解决。
此时根据你执行操作的情况不同,可以有很多种查看 key 分布的方式:
如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。
如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如 RDD.countByKey()。然后对统计出来的各个 key 出现的次数,collect/take 到客户端打印一下,就可以看到 key 的分布情况。
举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾斜,那么就应该看看进行 reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD 。如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key 的出现次数。
示例代码:
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
1、尽量避免数据源的数据倾斜
• 比如数据源是Kafka
以 Spark Stream 通过 DirectStream 方式读取 Kafka 数据为例。由于 Kafka 的每一个 Partition 对应 Spark 的一个 Task(Partition),所以 Kafka 内相关 Topic 的各 Partition 之间数据是否平衡,直接决定 Spark 处理该数据时是否会产生数据倾斜。
Kafka 某一 Topic 内消息在不同 Partition 之间的分布,主要由 Producer 端所使用的 Partition 实现类决定。如果使用随机 Partitioner,则每条消息会随机发送到一个 Partition 中,从而从概率上来讲,各 Partition 间的数据会达到平衡。此时源 Stage(直接读取 Kafka 数据的 Stage)不会产生数据倾斜。
但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据放于同一个 Partition 中。一个典型的场景是,需要将同一个用户相关的 PV 信息置于同一个 Partition 中。此时,如果产生了数据倾斜,则需要通过其它方式处理。
• 比如数据源是Hive
导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个 key 对应了 100 万数据,其他 key 才对应了10条数据),而且业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作,那么比较适合使用这种技术方案。
方案实现思路
:此时可以评估一下,是否可以通过 Hive 来进行数据预处理(即通过 Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join),然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业中也就不需要使用原先的 shuffle 类算子执行这类操作了。
方案实现原理
:这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀
的问题,所以 Hive ETL 中进行 group by 或者 join 等 shuffle 操作时,还是会出现数据倾斜,导致 Hive ETL 的速度很慢。我们只是把数据倾斜的发生提前到了 Hive ETL 中,避免 Spark 程序发生数据倾斜而已。
方案优点
:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark 作业的性能会大幅度提升。
方案缺点
:治标不治本,Hive ETL 中还是会发生数据倾斜。
方案实践经验
:在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对 Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的 Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次 Java 调用 Spark 作业时,执行速度都会很快,能够提供更好的用户体验。
项目实践经验
:在美团·点评的交互式用户行为分析系统
中使用了这种方案,该系统主要是允许用户通过 Java Web 系统提交数据分析统计任务,后端通过 Java 提交 Spark 作业进行数据分析统计。要求 Spark 作业速度必须要快,尽量在 10 分钟以内,否则速度太慢,用户体验会很差。所以我们将有些 Spark 作业的 shuffle 操作提前到了 Hive ETL 中,从而让 Spark 直接使用预处理的 Hive 中间表,尽可能地减少 Spark 的 shuffle 操作,大幅度提升了性能,将部分作业的性能提升了 6 倍以上。
2、调整并行度:分散同一个 Task 的不同 Key
方案适用场景
:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。
方案实现思路
:在对 RDD 执行 shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即spark.sql.shuffle.partitions
,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。
方案实现原理
:增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个 key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了。具体原理如下图所示。
方案优点
:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
方案缺点
:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
方案实践经验
:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
方案实现原理
:Spark 在做 Shuffle 时,默认使用 HashPartitioner(非 Hash Shuffle)对数据进行分区
。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。
如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。
案例:
现有一张测试数据集,内有 100 万条数据,每条数据有一个唯一的 id 值。现通过一些处理,使得 id 为 90 万之下的所有数据对 12 取模后余数为 8(即在 Shuffle 并行度为 12 时该数据集全部被 HashPartition 分配到第 8 个 Task),其它数据集 id 不变,从而使得 id 大于 90 万的数据在 Shuffle 时可被均匀分配到所有 Task 中,而 id 小于 90 万的数据全部分配到同一个 Task 中。处理过程如下:
Step1:准备原始数据
原始数据格式:
20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙传 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
20111230000007 b97920521c78de70ac38e3713f524b50 本本联盟 1 1 http://www.bblianmeng.com/
20111230000008 6961d0c97fe93701fc9c0d861d096cd9 华南师范大学图书馆 1 1 http://lib.scnu.edu.cn/
......
......
数据说明:
==========数据格式==========
访问时间 用户id 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL
20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
==========数据注意==========
1、其中用户 ID 是根据用户使用浏览器访问搜索引擎时的 Cookie 信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户 ID。
2、数据字段之间用“\t”进行分割
Step2:给原始数据增加 ID 属性
处理原理
:将 RDD 通过 zipWithIndex 实现 ID 添加,将 RDD 以制表符分割并转换为 ArrayBuffer,然后通过 mkString 将数据以 Text 输出。
(1)将原始数据上传到到 HDFS 上
[atguigu@hadoop102 software]$ pwd /opt/software [atguigu@hadoop102 software]$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put ./source.txt / SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] [atguigu@hadoop102 software]$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -ls / SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Found 11 items drwxr-xr-x - atguigu supergroup 0 2019-04-28 18:26 /data drwxr-xr-x - atguigu supergroup 0 2019-05-03 01:17 /directory drwxr-xr-x - atguigu supergroup 0 2019-04-17 19:56 /event_logs -rw-r--r-- 1 atguigu supergroup 1247306 2019-04-30 19:03 /graphx-wiki-edges.txt -rw-r--r-- 1 atguigu supergroup 946608 2019-04-30 19:04 /graphx-wiki-vertices.txt drwxr-xr-x - atguigu supergroup 0 2019-04-30 09:34 /hbase -rw-r--r-- 1 atguigu supergroup 114845849 2019-05-03 10:12 /source.txt drwxr-xr-x - atguigu supergroup 0 2019-04-29 11:26 /spark drwxr-xr-x - atguigu supergroup 0 2019-04-28 00:24 /spark_warehouse drwxrwx--- - atguigu supergroup 0 2019-04-18 10:23 /tmp drwxr-xr-x - atguigu supergroup 0 2019-04-22 11:23 /user [atguigu@hadoop102 software]$
(2)通过 spark-shell 加载原始数据并转换输出
scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/source.txt")
sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/source.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val sourceWithIndexRdd = sourceRdd.zipWithIndex.map(tuple => {
val array = scala.collection.mutable.ArrayBuffer[String](); array++=(tuple._1.split("\t")); tuple._2.toString +=: array; array.toArray})
sourceWithIndexRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:26
scala> sourceWithIndexRdd.map(_.mkString("\t")).saveAsTextFile("hdfs://hadoop102:9000/source_index")
HDFS 上查看转换后的结果
[atguigu@hadoop102 ~]$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -ls /source_index
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 3 items
-rw-r--r-- 1 atguigu supergroup 0 2019-05-03 10:18 /source_index/_SUCCESS
-rw-r--r-- 1 atguigu supergroup 60813047 2019-05-03 10:18 /source_index/part-00000
-rw-r--r-- 1 atguigu supergroup 60921692 2019-05-03 10:18 /source_index/part-00001
Step3:通过 spark-shell 加载新的数据并进行对应处理
#加载添加了id的原始数据 scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/source_index") sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/source_index MapPartitionsRDD[1] at textFile at <console>:24 #新建一个 case 类代表数据集 scala> case class brower(id: Int, time: Long, uid: String, keyword: String, url_rank: Int, click_num: Int, click_url: String) extends Serializable defined class brower #通过 case 类创建 Dataset scala> val ds = sourceRdd.map(_.split("\t")).map(attr => brower(attr(0).toInt, attr(1).toLong, attr(2), attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDS ds: org.apache.spark.sql.Dataset[brower] = [id: int, time: bigint ... 5 more fields] #注册一个临时表 scala> ds.createOrReplaceTempView("sourceTable") #执行新的查询 scala> val newSource = spark.sql("SELECT CASE WHEN id < 900000 THEN (8 + (CAST (RAND() * 50000 AS bigint)) * 12 ) ELSE id END, time, uid, keyword, url_rank, click_num, click_url FROM sourceTable") newSource: org.apache.spark.sql.DataFrame = [CASE WHEN (id < 900000) THEN (CAST(8 AS BIGINT) + (CAST((rand(-5486683549522524104) * CAST(50000 AS DOUBLE)) AS BIGINT) * CAST(12 AS BIGINT))) ELSE CAST(id AS BIGINT) END: bigint, time: bigint ... 5 more fields] #将 900000 之前的 ID 设定为 12 取余为 8 的 ID 集,当并行度为 12 时,会通过 hash 分区器分区到第 8 个任务 #输出新的测试数据 scala> newSource.rdd.map(_.mkString("\t")).saveAsTextFile("hdfs://hadoop102:9000/test_data")
Step4:通过上述处理,一份可能造成后续数据倾斜的测试数据已经准备好
接下来,使用 Spark 读取该测试数据,并通过 groupByKey(12) 对 id 分组处理,且 Shuffle 并行度为 12。代码如下:
scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/test_data/p*")
sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/test_data/p* MapPartitionsRDD[1] at textFile at <console>:24
scala> val kvRdd = sourceRdd.map(x => {
val parm = x.split("\t"); (parm(0).trim().toInt, parm(1).trim()) })
kvRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at <console>:26
scala> kvRdd.groupByKey(12).count
res0: Long = 150000
scala> :quit
本次实验所使用集群节点数为 3,每个节点可被 Yarn 使用的 CPU 核数为 3,内存为 2GB。在 Spark-shell 中进行提交。
GroupBy Stage 的 Task 状态如下图所示,Task 8 处理的记录数为 90 万,远大于(9 倍于)其它 11 个 Task 处理的 10 万记录。而 Task 8 所耗费的时间为1 秒,远高于其它 11 个 Task 的平均时间。整个 Stage 的时间也为 1 秒,该时间主要由最慢的 Task 8 决定。数据之间处理的比例最大为 105 倍。
在这种情况下,可以通过调整 Shuffle 并行度,使得原来被分配到同一个 Task(即该例中的 Task 8)的不同 Key 分配到不同 Task,从而降低 Task 8 所需处理的数据量,缓解数据倾斜。
通过 groupByKey(17) 将 Shuffle 并行度调整为 17,重新提交到 Spark。新的 Job 的 GroupBy Stage 所有 Task 状态如下图所示。
scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/test_data/p*")
sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/test_data/p* MapPartitionsRDD[1] at textFile at <console>:24
scala> val kvRdd = sourceRdd.map(x =>{
val parm=x.split("\t");(parm(0).trim().toInt, parm(1).trim()) })
kvRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[2] at map at <console>:26
scala> kvRdd.groupByKey(17).count
res0: Long = 150000
scala> :quit
从上图可知,相比以上次一计算,目前每一个计算的数据都比较平均,数据之间的最大比例基本为 1:1,总体时间降到了 0.8 秒。
在这种场景下,调整并行度,并不意味着一定要增加并行度,也可能是减小并行度。如果通过 groupByKey(7) 将 Shuffle 并行度调整为 7,重新提交到 Spark。新 Job 的 GroupBy Stage 的所有 Task 状态如下图所示。
从上图可见,处理记录数都比较平均。
总结:
适用场景
:大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。
解决方案
:调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。
方案优点
:实现简单,可在需要 Shuffle 的操作算子上直接设置并行度或者使用spark.default.parallelism
设置。如果是 Spark SQL,还可通过SET spark.sql.shuffle.partitions=[num_tasks]
设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。
方案缺点
:适用场景少,只能将分配到同一 Task 的不同 Key 分散开,但对于同一 Key 倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。
3、自定义 Partitioner
方案原理
:使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。
案例:
以上述数据集为例,继续将并发度设置为 12,但是在 groupByKey 算子上,使用自定义的 Partitioner,实现如下:
class CustomerPartitioner(numParts: Int) extends org.apache.spark.Partitioner {
// 覆盖分区数
override def numPartitions: Int = numParts
// 覆盖分区号获取函数
override def getPartition(key: Any): Int = {
val id: Int = key.toString.toInt
if (id <= 900000)
return new java.util.Random().nextInt(100) % 12
else
return id % 12
}
}
执行如下代码:
scala> :paste // Entering paste mode (ctrl-D to finish) class CustomerPartitioner(numParts: Int) extends org.apache.spark.Partitioner { // 覆盖分区数 override def numPartitions: Int = numParts // 覆盖分区号获取函数 override def getPartition(key: Any): Int = { val id: Int = key.toString.toInt if (id <= 900000) return new java.util.Random().nextInt(100) % 12 else return id % 12 } } // Exiting paste mode, now interpreting. defined class CustomerPartitioner scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/test_data/p*") sourceRdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/test_data/p* MapPartitionsRDD[10] at textFile at <console>:24 scala> val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt, parm(1).trim()) }) kvRdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[11] at map at <console>:26 scala> kvRdd.groupByKey(new CustomerPartitioner(12)).count res5: Long = 565650 scala> :quit
由下图可见,使用自定义 Partition 后,各 Task 所处理的数据集大小相当。
总结:
方案适用场景
:大量不同的 Key 被分配到了相同的 Task 造成该 Task 数据量过大。
解决方案
:使用自定义的 Partitioner 实现类代替默认的 HashPartitioner,尽量将所有不同的 Key 均匀分配到不同的 Task 中。
方案优点
:不影响原有的并行度设计。如果改变并行度,后续 Stage 的并行度也会默认改变,可能会影响后续 Stage。
方案缺点
:适用场景有限,只能将不同 Key 分散开,对于同一 Key 对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的 Partitioner,不够灵活。
4、将 Reduce side Join 转变为 Map side Join
方案适用场景
:在对 RDD 使用 join 类操作,或者是在 Spark SQL 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小(比如几百M 或者一两 G),比较适用此方案。
方案实现思路
:不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。
方案实现原理
:普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 +map 算子来实现与 join 同样的效果
,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。具体原理如下图所示。
方案优点
:对 join 操作导致的数据倾斜,效果非常好,因为根本就不会发生 shuffle,也就根本不会发生数据倾斜。
方案缺点
:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况
。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,Driver 和每个 Executor 内存中都会驻留一份小 RDD 的全量数据。如果我们广播出去的 RDD 数据比较大,比如 10G 以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。
通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。
案例1:
Step1:准备数据
scala> val sourceRdd = sc.textFile("hdfs://hadoop102:9000/source_index/p*")
scala> val kvRdd = sourceRdd.map(x =>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。