当前位置:   article > 正文

spark参数配置_spark hbo

spark hbo

参考链接
https://spark.apache.org/docs/2.2.0/configuration.html

 

分类

参数

含义

平台默认值

建议

资源申请&并行度

spark.executor.cores

一个Executor中同时可以执行的task数目(在Executor内存不变的情况下,executor-cores数越大,平均下来一个task可以使用的内存就越少)

1

  1. 默认设置,或者同时等比例增大,最高不超过默认值的3倍。

  2. 建议spark.executor.cores * spark.dynamicAllocation.maxExecutors < 8000
    (cu, 队列总配额为20000)

  3. executor的内存大小限制:
    spark.executor.memory + spark.yarn.executor.memoryOverhead <= 16G (YARN container最大内存限制)

参考Spark内存模型以及调参建议

spark.executor.memory

Executor Java进程的堆内存大小,即Executor Java进程的Xmx值

2G

spark.yarn.executor.memoryOverhead

Executor Java进程的off-heap内存,包括JVM overhead,sort、shuffle以及Netty的堆外内存

1024

(注意:该参数的单位为MB)

spark.dynamicAllocation.enabled

是否开启动态资源分配,强烈建议开启。

true

  1. 保持默认值即可

spark.dynamicAllocation.maxExecutors

开启动态资源分配后,同一时刻,最多可申请的executor个数

1000

  1. spark.executor.cores * spark.dynamicAllocation.maxExecutors < 8000 (cu)

  2. 当在Spark UI中观察到task较多时,可适当调大此参数,缩短作业执行时间。 一般保持shuffle.partitions / (maxExecutors*executor.cores)=2

spark.dynamicAllocation.minExecutors

开启动态资源分配后,某一时刻executor的最小个数。默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活

3

  1. 保持默认值即可

spark.memory.fraction

存储+执行内存占节点总内存的大小,社区版是0.6。平台为了方便的把hive任务迁移到spark任务,把该区域的内存比例调小至0.3。

0.3

  1. 没有udf或者udf不会消耗太多内存的任务可以调整到0.5甚至社区版默认的0.6,减少spill的次数,提升性能。HBO参数修改方案v2

spark.memory.storageFraction

存储内存占(存储+执行)内存的比例

0.5

  1. 保持默认值即可

spark.driver.memory

driver使用内存大小

10G

  1. 一般不需要更改此设置。

  2. 确实需要有大表广播的,可以考虑增加这个数值

spark.yarn.driver.memoryOverhead

driver进程的off-heap内存

spark.driver.memory * 0.1,并且不小于384m

  1. 保持默认值即可

mapjoin

spark.sql.autoBroadcastJoinThreshold

当执行join时,小表被广播的阈值。当被设置为-1,则禁用广播。表大小需要从 Hive Metastore 中获取统计信息。该参数设置的过大会对driver和executor都产生压力。

26214400
(25M)

  1. 由于我们的表大部分为ORC压缩格式,解压后的数据量达到3-5倍甚至10倍,所以调大该参数需要注意。

文件输入输出与合并

spark.hadoop.hive.exec.orc.split.strategy

参数控制在读取ORC表时生成split的策略:

BI策略以文件为粒度进行split划分;

ETL策略会将文件进行切分,多个stripe组成一个split;

HYBRID策略当文件的平均大小大于hadoop最大split值(默认256M)时使用ETL策略,否则使用BI策略。

BI模式

  1. 由于读orc文件时默认按文件划分task(BI模式), 有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中)的情况并发可能不够, 影响执行效率. 可以改成ETL模式

  2. 对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。

spark.hadoop.mapreduce.input.fileinputformat.split.minsize

计算Split划分时的minSize

spark.hadoop.mapreduce.input.fileinputformat.split.maxsize

控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小大于spark.hadoop.mapreduce.input.fileinputformat.split.maxsize时,会合并到一个task中处理。可以适当调小该值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大读ORC表的并发。

spark.hadoopRDD.targetBytesInPartition

读取输入文件时&最终合并小文件时,每个task读取的数据量

33554432
(32M)

  1. 建议67108864, 如果发现读取文件的task较多,可以适当增大该值。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

开启spark.sql.adaptive.enabled后,最后一个stage在进行动态合并partition时,会根据shuffle read的数据量除以该参数设置的值来计算合并后的partition数量。所以增大该参数值会减少partition数量,反之会增加partition数量。

67108864

(64M)

  1. 268435456, 可以适当增大或减小。

spark.sql.mergeSmallFileSize

与 hive.merge.smallfiles.avgsize 类似,写入hdfs后小文件合并的阈值。如果生成的文件平均大小低于该参数配置,则额外启动一轮stage进行小文件的合并

未设置,默认不合并小文件

  1. 建议33554432,如果最终文件数仍然较多,可适当调大.。 参考Spark Hive 小文件合并

  2. 如果输入数据量量大,但是最终结果数据量较少时,可以在最后加一个同时结合distribute by操作。如果最终结果数据量本来就较大,没必要加distribute by。

spark.sql.targetBytesInPartitionWhenMerge

与hive.merge.size.per.task 类似,设置额外的合并job的map端输入size.

合并小文件时候,实际的map输入size=max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition )

未设置

  1. 建议33554432。合并小文件时候,实际的map输入size=max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition )

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version

文件提交算法,MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,version=2 是批量按照目录进行提交,version=1是一个个的按照文件提交。设置 version=2 可以极大的节省文件提交至hdfs的时间,减轻nn压力 。参考http://www.jasongj.com/spark/committer/

2

  1. 保持默认值即可

shuffle

spark.sql.shuffle.partitions

reduce阶段(shuffle read)的数据分区,分区数越多,启动的task越多,同时生成的文件数也会越多。

2000

1. 建议一个partition保持在256mb左右的大小就好。当作业数据较多时,适当调大该值,当作业数据较少时,适当调小以节省资源

2. spark.sql.shuffle.partitions设置过大可能会导致很多reducer同时向一个mapper拉取数据。该mapper由于请求压力过大挂掉或响应缓慢,从而导致fetch failed。

spark.sql.adaptive.enabled

是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。

true

  1. 保持默认值即可

spark.sql.adaptive.minNumPostShufflePartitions

开启spark.sql.adaptive.enabled后,合并之后最少会生成的分区数

5

  1. 保持默认值即可

spark.sql.adaptive.shuffle.targetPostShuffleInputSize

开启spark.sql.adaptive.enabled后,最后一个stage在进行动态合并partition时,会根据shuffle read的数据量除以该参数设置的值来计算合并后的partition数量。所以增大该参数值会减少partition数量,反之会增加partition数量。

67108864

(64M)

  1. 268435456, 可以适当增大或减小。

spark.sql.adaptive.join.enabled

是否动态调整join算法

false

spark.sql.adaptiveBroadcastJoinThreshold

SortMergeJoin 转 BroadcastJoin 的阈值。

如果不设置该参数,该阈值与spark.sql.autoBroadcastJoinThreshold的值相等

未设置

spark.sql.statistics.fallBackToHdfs

当表的文件大小元数据信息不可能用时回退到hdfs计算表的文件大小,从而决定是否使用map join. 分区表如果读入数据较少也不会优化为BroadcastJoin, 可以通过添加该参数优化:

false

  1. false. 数据

spark.sql.adaptive.allowAdditionalShuffle

是否允许为了优化 Join 而增加 Shuffle。其默认值为 false

false

spark.sql.adaptive.skewedJoin.enabled

是否自动处理 Join 时数据倾斜 (根据前面stage的shuffle write信息操作来动态调整是使用sortMergeJoin还是broadcastJoin)

false

  1. 公司版本没有该参数

spark.sql.adaptive.skewedPartitionMaxSplits

处理一个倾斜 Partition 的 Task 个数上限,默认值为 5

5

  1. 公司版本没有该参数

spark.sql.adaptive.skewedPartitionRowCountThreshold

一个 Partition 被视为倾斜 Partition 的行数下限,也即行数低于该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即一千万

10L * 1000 * 1000

  1. 公司版本没有该参数

spark.sql.adaptive.skewedPartitionSizeThreshold

一个 Partition 被视为倾斜 Partition 的大小下限,也即大小小于该值的 Partition 不会被视作倾斜 Partition。其默认值为 64 * 1024 * 1024 也即 64MB

64 * 1024 * 1024

  1. 公司版本没有该参数

spark.sql.adaptive.skewedPartitionFactor

倾斜因子。如果一个 Partition 的大小大于spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于spark.sql.adaptive.skewedPartitionRowCountThreshold的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition

10

  1. 公司版本没有该参数

spark.shuffle.service.enabled

启用外部shuffle服务,这个服务会安全地保存shuffle过程中,executor写的磁盘文件,因此executor即使挂掉也不要紧,必须配合spark.dynamicAllocation.enabled属性设置为true,才能生效,而且外部shuffle服务必须进行安装和启动,才能启用这个属性

true

  1. 保持默认值即可

spark.reducer.maxSizeInFlight

同一时刻一个reducer可以同时拉取的数据量大小

24m

  1. 保持默认值即可。 社区版默认48m, 任务运行较慢时可以调大试试。

spark.reducer.maxReqsInFlight

同一时刻一个reducer可以同时产生的请求数

10

  1. 保持默认值即可。社区版无限制,任务运行较慢时可以适当调大试试

spark.reducer.maxBlocksInFlightPerAddress

同一时刻一个reducer向同一个上游executor最多可以拉取的数据块数

1

  1. 保持默认值即可。社区版无限制,任务运行较慢时可以适当调大试试

spark.shuffle.io.connectionTimeout

客户端超时时间,超过该时间会fetchfailed

120s

  1. 保持默认值即可。

spark.shuffle.io.maxRetries

shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

3

  1. 保持默认值即可。

spark.shuffle.io.retryWait

每次重试的等待间隔:

5s

spark.task.maxFailures

Task的最大重试次数

4

spark.reducer.maxReqSizeShuffleToMem

shuffle请求的文件块大小 超过这个参数值,就会被强行落盘,防止一大堆并发请求把内存占满。

536870911

  1. 保持默认值即可。社区版默认Long.MaxValue,美团默认512M。

推测执行

spark.speculation

spark推测执行的开关,作用同hive的推测执行

true

  1. 保持默认值即可。

spark.speculation.interval

开启推测执行后,每隔多久通过checkSpeculatableTasks方法检测是否有需要推测式执行的tasks

1000ms

  1. 保持默认值即可。

spark.speculation.quantile

当成功的Task数超过总Task数的spark.speculation.quantile时(社区版默认75%,公司默认99%),再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以spark.speculation.multiplier(社区版默认1.5,公司默认3)得到运行时间门限,如果在运行的Tasks的运行时间超过这个门限,则对它启用推测。

0.99

  1. 如果资源充足,可以适当减小spark.speculation.quantile和spark.speculation.multiplier的值

spark.speculation.multiplier

3

谓词下推

spark.sql.orc.filterPushdown

orc格式下的谓词下推开关

true

  1. 保持默认值即可。

park.sql.hive.metastorePartitionPruning

当为true,spark sql的谓语将被下推到hive metastore中,更早的消除不匹配的分区

true

  1. 保持默认值即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/653277
推荐阅读
相关标签
  

闽ICP备14008679号