赞
踩
对于 Spark 性能调优来说,应用开发和配置项设置是两个最主要也最常用的入口。但在日常的调优工作中,每当我们需要从配置项入手寻找调优思路的时候,一打开 Spark 官网的 Configuration 页面,映入眼帘的就是上百个配置项。它们有的需要设置 True 或 False,有的需要给定明确的数值才能使用。这难免让我们蒙头转向、无所适从。
事实上,能够显著影响执行性能的配置项屈指可数,更何况在 Spark 分布式计算环境中,计算负载主要由 Executors 承担,Driver 主要负责分布式调度,调优空间有限,因此对 Driver 端的配置项我们不作考虑,我们要汇总的配置项都围绕 Executors 展开。那么,结合过往的实践经验,以及对官网全量配置项的梳理,我把它们划分为 3 类,分别是硬件资源类、Shuffle 类和 Spark SQL 大类。
首先,硬件资源类包含的是与 CPU、内存、磁盘有关的配置项。我们说过,调优的切入点是瓶颈,定位瓶颈的有效方法之一,就是从硬件的角度出发,观察某一类硬件资源的负载与消耗,是否远超其他类型的硬件,而且调优的过程收敛于所有硬件资源平衡、无瓶颈的状态,所以掌握资源类配置项就至关重要了。这类配置项设置得是否得当,决定了应用能否打破瓶颈,来平衡不同硬件的资源利用率。
其次,Shuffle 类是专门针对 Shuffle 操作的。在绝大多数场景下,Shuffle 都是性能瓶颈。因此,我们需要专门汇总这些会影响 Shuffle 计算过程的配置项。同时,Shuffle 的调优难度也最高,汇总 Shuffle 配置项能帮我们在调优的过程中锁定搜索范围,充分节省时间。
最后,Spark SQL 早已演化为新一代的底层优化引擎。无论是在 Streaming、Mllib、Graph 等子框架中,还是在 PySpark 中,只要你使用 DataFrame API,Spark 在运行时都会使用 Spark SQL 做统一优化。因此,我们需要梳理出一类配置项,去充分利用 Spark SQL 的先天性能优势。
首先,我们先来说说与 CPU 有关的配置项,主要包括 spark.cores.max、spark.executor.cores 和 spark.task.cpus 这三个参数。它们分别从集群、Executor 和计算任务这三个不同的粒度,指定了用于计算的 CPU 个数。开发者通过它们就可以明确有多少 CPU 资源被划拨给 Spark 用于分布式计算。
为了充分利用划拨给 Spark 集群的每一颗 CPU,准确地说是每一个 CPU 核(CPU Core),你需要设置与之匹配的并行度,并行度用 spark.default.parallelism 和 spark.sql.shuffle.partitions 这两个参数设置。对于没有明确分区规则的 RDD 来说,我们用 spark.default.parallelism 定义其并行度,spark.sql.shuffle.partitions 则用于明确指定数据关联或聚合操作中 Reduce 端的分区数量。
说到并行度(Parallelism)就不得不提并行计算任务(Paralleled Tasks)了,这两个概念关联紧密但含义大相径庭,有不少同学经常把它们弄混。
并行度指的是分布式数据集被划分为多少份,从而用于分布式计算。换句话说,并行度的出发点是数据,它明确了数据划分的粒度。并行度越高,数据的粒度越细,数据分片越多,数据越分散。由此可见,像分区数量、分片数量、Partitions 这些概念都是并行度的同义词。
并行计算任务则不同,它指的是在任一时刻整个集群能够同时计算的任务数量。换句话说,它的出发点是计算任务、是 CPU,由与 CPU 有关的三个参数共同决定。具体说来,Executor 中并行计算任务数的上限是 spark.executor.cores 与 spark.task.cpus 的商,暂且记为 #Executor-tasks,整个集群的并行计算任务数自然就是 #Executor-tasks 乘以集群内 Executors 的数量,记为 #Executors。因此,最终的数值是:#Executor-tasks * #Executors。
我们不难发现,并行度决定了数据粒度,数据粒度决定了分区大小,分区大小则决定着每个计算任务的内存消耗。在同一个 Executor 中,多个同时运行的计算任务“基本上”是平均瓜分可用内存的,每个计算任务能获取到的内存空间是有上限的,因此并行计算任务数会反过来制约并行度的设置。你看,这两个家伙还真是一对相爱相杀的冤家!
至于,到底该怎么平衡并行度与并行计算任务两者之间的关系,我们留到后面的课程去展开。这里,咱们只要记住和 CPU 设置有关配置项的含义、区别与作用就行了。
spark.sql.shuffle.partitions 只有 ds 和 df 生效,就是说也对上层 spark sql 生效,对 raw rdd 不生效
spark.default.parallelism 只对 raw rdd 生效,并且像文中说的没有指定分区器和分区数时,才会使用 default
我们知道,在管理模式上,Spark 分为堆内内存与堆外内存。
堆外内存又分为两个区域,Execution Memory 和 Storage Memory。要想要启用堆外内存,我们得先把参数 spark.memory.offHeap.enabled 置为 true,然后用 spark.memory.offHeap.size 指定堆外内存大小。堆内内存也分了四个区域,也就是 Reserved Memory、User Memory、Execution Memory 和 Storage Memory。
简单来说,这些配置项决定了我们刚才说的这些区域的大小,这很好理解。工具有了,但很多同学在真正设置内存区域大小的时候还会有各种各样的疑惑,比如说:
相比 JVM 堆内内存,off heap 堆外内存有很多优势,如更精确的内存占用统计、更节约、不需要垃圾回收机制,以及不需要序列化与反序列化。你可能会说:“既然堆外内存这么厉害,那我们干脆把所有内存都划分给它不就得了?”先别急着下结论,我们先一起来看一个例子。
用户表 1 记录着用户数据,每个数据条目包含 4 个字段,整型的用户 ID、String 类型的姓名、整型的年龄和 Char 类型的性别。如果现在要求你用字节数组来存储每一条用户记录,你该怎么办呢?
我们一起来做一下。首先,除姓名外其它 3 个字段都是定长数据类型,因此可以直接安插到字节数组中。对于变长数据类型如 String,由于我们事先并不知道每个用户的名字到底有多长,因此,为了把 name 字段也用字节数组的形式存储,我们只能曲线救国:先记录 name 字段的在整个字节数组内的偏移量,再记录它的长度,最后把完整的 name 字符串安插在字节数组的末尾,如下图所示。
尽管存储 String 类型的 name 字段麻烦一些,但我们总算成功地用字节数组容纳了每一条用户记录。OK,大功告成!
你可能会问:“做这个小实验的目的是啥呢?”事实上,Spark 开辟的堆外内存就是以这样的方式来存储应用数据的。正是基于这种紧凑的二进制格式,相比 JVM 堆内内存,Spark 通过 Java Unsafe API 在堆外内存中的管理,才会有那么多的优势。
不过,成也萧何败也萧何,字节数组自身的局限性也很难突破。比如说,如果用户表 1 新增了兴趣列表字段,类型为 List[String],如用户表 2 所示。这个时候,如果我们仍然采用字节数据的方式来存储每一条用户记录,不仅越来越多的指针和偏移地址会让字段的访问效率大打折扣,而且,指针越多,内存泄漏的风险越大,数据访问的稳定性就值得担忧了。
因此,当数据模式(Data Schema)开始变得复杂时,Spark 直接管理堆外内存的成本将会非常高。
那么,针对有限的内存资源,我们该如何平衡 JVM 堆内内存与 off heap 堆外内存的划分,我想你心中也该有了答案。对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用 JVM 堆内内存会更加稳妥。
其实可以看到像DataFrame 和 DataSet 之类的可以更好的使用堆外内存
接下来,我们再来说说 User Memory。我们都知道,参数 spark.memory.fraction 的作用是明确 Spark 可支配内存占比,换句话说,就是在所有的堆内空间中,有多大比例的内存可供 Spark 消耗。相应地,1 - spark.memory.fraction 就是 User Memory 在堆内空间的占比。
因此,spark.memory.fraction 参数决定着两者如何瓜分堆内内存,它的系数越大,Spark 可支配的内存越多,User Memory 区域的占比自然越小。spark.memory.fraction 的默认值是 0.6,也就是 JVM 堆内空间的 60% 会划拨给 Spark 支配,剩下的 40% 划拨给 User Memory。
那么,User Memory 都用来存啥呀?需要预留那么大的空间吗?简单来说,User Memory 存储的主要是开发者自定义的数据结构,这些数据结构往往用来协助分布式数据集的处理。
/** 实现方式2 输入参数:模板文件路径,用户兴趣字符串 返回值:用户兴趣字符串对应的索引值 */ //函数定义 val findIndex: (String) => (String) => Int = { (filePath) => val source = Source.fromFile(filePath, "UTF-8") val lines = source.getLines().toArray source.close() val searchMap = lines.zip(0 until lines.size).toMap (interest) => searchMap.getOrElse(interest, -1) } val partFunc = findIndex(filePath) //Dataset中的函数调用 partFunc("体育-篮球-NBA-湖人")
我们先读取包含用户兴趣的模板文件,然后根据模板内容构建兴趣到索引的映射字典。在对千亿样本做 Lable Encoding 的时候,这个字典可以快速查找兴趣字符串,并返回对应索引,来辅助完成数据处理。像这样的映射字典就是所谓的自定义数据结构,这部分数据都存储在 User Memory 内存区域。
因此,当在 JVM 内平衡 Spark 可用内存和 User Memory 时,你需要考虑你的应用中类似的自定义数据结构多不多、占比大不大?然后再相应地调整两块内存区域的相对占比。如果应用中自定义的数据结构很少,不妨把 spark.memory.fraction 配置项调高,让 Spark 可以享用更多的内存空间,用于分布式计算和缓存分布式数据集。
执行任务与 RDD 缓存共享 Spark 可支配内存,但是, 执行任务在抢占方面有更高的优先级。
因此通常来说,在统一内存管理模式下,spark.memory.storageFraction 的设置就显得没那么紧要,因为无论这个参数设置多大,执行任务还是有机会抢占缓存内存,而且一旦完成抢占,就必须要等到任务执行结束才会释放。
不过,凡事都没有绝对,如果你的应用类型是“缓存密集型”,如机器学习训练任务,就很有必要通过调节这个参数来保障数据的全量缓存。这类计算任务往往需要反复遍历同一份分布式数据集,数据缓存与否对任务的执行效率起着决定性作用。这个时候,我们就可以把参数 spark.memory.storageFraction 调高,然后有意识地在应用的最开始把缓存灌满,再基于缓存数据去实现计算部分的业务逻辑。
但在这个过程中,你要特别注意 RDD 缓存与执行效率之间的平衡。为什么这么说呢?
首先,RDD 缓存占用的内存空间多了,Spark 用于执行分布式计算任务的内存空间自然就变少了,而且数据分析场景中常见的关联、排序和聚合等操作都会消耗执行内存,这部分内存空间变少,自然会影响到这类计算的执行效率。
其次,大量缓存引入的 GC(Garbage Collection,垃圾回收)负担对执行效率来说是个巨大的隐患。
堆内内存的垃圾回收也是一个道理,JVM 大体上把 Heap 堆内内存分为年轻代和老年代。年轻代存储生命周期较短、引用次数较低的对象;老年代则存储生命周期较长、引用次数高的对象。因此,像 RDD cache 这种一直缓存在内存里的数据,一定会被 JVM 安排到老年代。
年轻代的垃圾回收工作称为 Young GC,老年代的垃圾回收称为 Full GC。每当老年代可用内存不足时,都会触发 JVM 执行 Full GC。在 Full GC 阶段,JVM 会抢占应用程序执行线程,强行征用计算节点中所有的 CPU 线程,也就是“集中力量办大事”。当所有 CPU 线程都被拿去做垃圾回收工作的时候,应用程序的执行只能暂时搁置。只有等 Full GC 完事之后,把 CPU 线程释放出来,应用程序才能继续执行。这种 Full GC 征用 CPU 线程导致应用暂停的现象叫做“Stop the world”。
因此,Full GC 对于应用程序的伤害远大于 Young GC,并且 GC 的效率与对象个数成反比,对象个数越多,GC 效率越差。这个时候,对于 RDD 这种缓存在老年代中的数据,就很容易引入 Full GC 问题。
一般来说,为了提升 RDD cache 访问效率,很多同学都会采用以对象值的方式把数据缓存到内存,因为对象值的存储方式避免了数据存取过程中序列化与反序列化的计算开销。我们在 RDD/DataFrame/Dataset 之上调用 cache 方法的时候,默认采用的就是这种存储方式。
但是,采用对象值的方式缓存数据,不论是 RDD,还是 DataFrame、Dataset,每条数据样本都会构成一个对象,要么是开发者自定义的 Case class,要么是 Row 对象。换句话说,老年代存储的对象个数基本等于你的样本数。因此,当你的样本数大到一定规模的时候,你就需要考虑大量的 RDD cache 可能会引入的 Full GC 问题了。
基于上面的分析,我们不难发现,在打算把大面积的内存空间用于 RDD cache 之前,你需要衡量这么做可能会对执行效率产生的影响。
你可能会说:“我的应用就是缓存密集型,确实需要把数据缓存起来,有什么办法来平衡执行效率吗?”办法还是有的。
首先,你可以放弃对象值的缓存方式,改用序列化的缓存方式,序列化会把多个对象转换成一个字节数组。这样,对象个数的问题就得到了初步缓解。
其次,我们可以调节 spark.rdd.compress 这个参数。RDD 缓存默认是不压缩的,启用压缩之后,缓存的存储效率会大幅提升,有效节省缓存内存的占用,从而把更多的内存空间留给分布式任务执行。
通过这两类调整,开发者在享用 RDD 数据访问效率的同时,还能够有效地兼顾应用的整体执行效率,可谓是两全其美。不过,有得必有失,尽管这两类调整优化了内存的使用效率,但都是以引入额外的计算开销、牺牲 CPU 为代价的。这也就是我们一直强调的:性能调优的过程本质上就是不断地平衡不同硬件资源消耗的过程。
在存储系统那一讲,我们简单提到过 spark.local.dir 这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储 RDD cache 落盘数据块和 Shuffle 中间文件。
通常情况下,spark.local.dir 会配置到本地磁盘中容量比较宽裕的文件系统,毕竟这个目录下会存储大量的临时文件,我们需要足够的存储容量来保证分布式任务计算的稳定性。
不过,如果你的经费比较充裕,有条件在计算节点中配备足量的 SSD 存储,甚至是更多的内存资源,完全可以把 SSD 上的文件系统目录,或是内存文件系统添加到 spark.local.dir 配置项中去,从而提供更好的 I/O 性能。
而且可以挂载多块硬盘,或者内存充足的情况下,可以开辟内存文件系统,用于配置 spark.local.dir 参数
我们知道,Shuffle 的计算过程分为 Map 和 Reduce 这两个阶段。其中,Map 阶段执行映射逻辑,并按照 Reducer 的分区规则,将中间数据写入到本地磁盘;Reduce 阶段从各个节点下载数据分片,并根据需要实现聚合计算。
可以通过 spark.shuffle.file.buffer 和 spark.reducer.maxSizeInFlight 这两个配置项,来分别调节 Map 阶段和 Reduce 阶段读写缓冲区的大小
在 Map 阶段,计算结果会以中间文件的形式被写入到磁盘文件系统。同时,为了避免频繁的 I/O 操作,Spark 会把中间文件存储到写缓冲区(Write Buffer)。这个时候,我们可以通过设置 spark.shuffle.file.buffer 来扩大写缓冲区的大小,缓冲区越大,能够缓存的落盘数据越多,Spark 需要刷盘的次数就越少,I/O 效率也就能得到整体的提升。
在 Reduce 阶段,因为 Spark 会通过网络从不同节点的磁盘中拉取中间文件,它们又会以数据块的形式暂存到计算节点的读缓冲区(Read Buffer)。缓冲区越大,可以暂存的数据块越多,在数据总量不变的情况下,拉取数据所需的网络请求次数越少,单次请求的网络吞吐越高,网络 I/O 的效率也就越高。这个时候,我们就可以通过 spark.reducer.maxSizeInFlight 配置项控制 Reduce 端缓冲区大小,来调节 Shuffle 过程中的网络负载。
事实上,对 Shuffle 计算过程的优化牵扯到了全部的硬件资源,包括 CPU、内存、磁盘和网络。因此,我们上一讲汇总的关于 CPU、内存和硬盘的配置项,也同样可以作用在 Map 和 Reduce 阶段的内存计算过程上。
Spark 还提供了一个叫做 spark.shuffle.sort.bypassMergeThreshold 的配置项,去处理一种特殊的 Shuffle 场景
自 1.6 版本之后,Spark 统一采用 Sort shuffle manager 来管理 Shuffle 操作,在 Sort shuffle manager 的管理机制下,无论计算结果本身是否需要排序,Shuffle 计算过程在 Map 阶段和 Reduce 阶段都会引入排序操作。
在不需要聚合,也不需要排序的计算场景中,我们就可以通过设置 spark.shuffle.sort.bypassMergeThreshold 的参数,来改变 Reduce 端的并行度(默认值是 200)。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序。
spark.shuffle.compress 是否对shuffle 的中间结果进行压缩,如果压缩的话使用spark.io.compression.codec
的配置进行压缩
spark.shuffle.io.maxRetries io 失败的重试次数,在大型shuffle遇到网络或者GC 问题的时候很有用。
spark.shuffle.io.retryWait i 失败的时候的等待时间
首先,对于 CPU 类配置项,我们要重点理解并行度与并行计算任务数的区别。并行度从数据的角度出发,明确了数据划分的粒度,并行度越高,数据粒度越细,数据越分散,CPU 资源利用越充分,但同时要提防数据粒度过细导致的调度系统开销。
并行计算任务数则不同,它从计算的角度出发,强调了分布式集群在任一时刻并行处理的能力和容量。并行度与并行计算任务数之间互相影响、相互制约。
其次,对于内存类配置项,我们要知道怎么设置它们来平衡不同内存区域的方法。这里我们主要搞清楚 3 个问题就可以了:
事实上,由于tungsten的优化,spark官方推荐使用堆内内存,在保证稳定性的同时,还可以大幅提升性能。
在生产环境中,当并发高使用spark sql时,spark driver(常驻)端的OOM问题是比较严重的,因为spark sql解析、dag划分、调度开销还是比较大的
需要注意的是,同一个task,不能脚踏两条船,不能同时用堆外和堆内,假设你最开始选择堆外,跑着跑着发现堆外不够用,这个时候,即使堆内还有空闲,task还是会oom。
在 Shuffle 过程中,对于不需要排序和聚合的操作,我们可以通过控制 spark.shuffle.sort.bypassMergeThreshold 参数,来避免 Shuffle 执行过程中引入的排序环节,从而避免没必要的计算开销。
对于 Spark SQL 大类我们首先要知道,AQE 默认是禁用状态,要充分利用 AQE 提供的 3 个特性,就是自动分区合并、数据倾斜处理和 Join 策略调整,我们需要把 spark.sql.adaptive.enabled 置为 true。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。