赞
踩
一、哪些情况会产生数据倾斜以及解决方案
在分布式计算中,数据倾斜是指数据在分布式节点中的不平衡分布,导致某些节点的负载过重,从而影响整个任务的运行效率和性能。对于Spark来说,数据倾斜是一个常见的问题,可能会导致任务运行时间过长、资源浪费、节点宕机等问题。
1、shuffle的时候,如果这个时候shuffle的字段为空,会出现数据倾斜。
解决方案:将空字段进行过滤
2、key有很多,分区设置过少,导致很多key聚集在一个分区出现的数据倾斜
解决方案:局部聚合 + 全局聚合
局部聚合–> 将分组字段的值加上一个随机数[加盐]
3、当某一个表中某一个key数据特别多,然后使用group by就会出现数据倾斜
解决方案:将小表广播出去,有原来的reduce join 变成 map join ,避免shuffle操作,从而解决数据倾斜
4、大表 join 小表 ,这两个表中有某一个key或者几个key数据比较多,会出现数据倾斜
5、大表 join 大表 ,其中某一个表分布比较均匀,另个一表存在某一个或者某几个key数据特别多也会出现数据倾斜
解决方案:将容易产生数据倾斜的key过滤出来,单独处理[加盐]扩容;没有数据倾斜的key的数据照常处理(如果导致倾斜key特别多的话,比如成千上万个key都导致数据倾斜,这种方法就不适用了,需要结合其它方法一起解决)
6、大表 join 大表 ,其中某一个表分布比较均匀,另一个表存在很多key数据特别多,也会出现数据倾斜
解决方案:直接将产生数据倾斜的表进行加盐[0-9],对另一个表进行扩容[最多扩容10倍]
二、缺失值处理
缺失值处理有两种方式,丢弃和填充
1、什么是缺失值,列的值为NaN、null、" “、”,“、“Null” 等等
2、缺失值处理的框架:df.na
3、缺失值处理
删除:API df.na.drop()
删除策略 any 只要一行中有一列值为NaN就删除该行 ;all 一行中所有的列值全部为NaN才删除;针对指定列删除 df.na,drop(“any”,List(“列名1”,“列名2”)),一行的列名1和列名2这两列有一列数据为NaN,就删除改行
填充 API df.na.fill(”")
替换 API df.na.repelace(“列名”,Map(“列值”,“替换值”))
三、调优
1、增加资源(shell提交任务时候资源调整)
executor-memory 每个executor内存大小
executor-cores 每个executor使用的CPU核数
num-executor 启动的executor的数量
/usr/local/spark-current/bin/spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 2G \
--driver-memory 1G \
--queue root.default \
--class my.Application \
--conf spark.ui.port=4052 \
--conf spark.port.maxRetries=100 \
--num-executors 2 \
--jars test1-spark-connector_2.11-2.3.1.jar \
--conf spark.memory.fraction=0.6 \
2、增加并行度
spark core spark.default.parallelism
sparksql spark.sql.shuffle.partitions
3、广播变量
spark core spark.sparkcontext.broadcast
spark sql spark.sql.autoBroadcastJoinThreshold 要广播出去的小表限制
4、RDD的重用
缓存persist或Cache
5、调整内存占比
spark.memory.fraction
spark.memory.storageFraction
spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
6、推测机制
spark.speculation
spark.speculation.multiplier
7、补充 Spark GC
内存分为两块(青年代和老年代)
青年代中又分为三块(Eden、survivor0、survivor1)
Eden中存放的是初始化对象,随着时间推移,这一块区域满了以后会进行minor GC,不用的清楚掉,然后使用的放到survivor0 中
Eden中清空以后又可以存放对象了,然后再次存满以后再次GC,直到survivor0中也满了,GC会 把Eden和survive0中使用的对象放在survive1中,不用的清除
重复这个步骤
Survive1中也存储满了以后,会把使用的对象放在老年代中,不使用的清除掉
当老年代中也存满了以后,这个时候就会进行full GC ,这个时候时间短几分钟,长的话十几分钟, 在这个过程中会暂停一切操作,优先GC,这个时候可能会出现拉取shuffle数据失败
应用行为 | 属性名 | 默认值 | 属性描述 |
---|---|---|---|
driver行为 | spark.driver.cores | 1 | driver程序运行需要的cpu内核数 |
driver行为 | spark.driver.maxResultSize | 1G | 每个Spark action(如collect)所有分区的序列化结果的总大小限制。设置的值应该不小于1m,0代表没有限制。如果总大小超过这个限制,程序将会终止。大的限制值可能导致driver出现内存溢出错误(依赖于spark.driver.memory和JVM中对象的内存消耗) |
driver行为 | spark.driver.memory | 1G | driver进程使用的内存数 |
driver行为 | spark.driver.memoryOverhead | driverMemory * 0.10,with minimum of 384 | driver端分配的堆外内存 |
driver行为 | spark.driver.extraClassPath | None | 附加到driver的classpath的额外的classpath实体 |
driver行为 | spark.driver.defaultJavaOptions | None | 默认传递给driver的JVM选项字符串。注意这个配置不能直接在代码中使用SparkConf来设置,因为这个时候driver JVM已经启动了,可以在命令行通过–driver-java-options参数来设置 |
driver行为 | spark.driver.extraJavaOptions | None | 传递给driver的JVM选项字符串 |
driver行为 | spark.driver.extraLibraryPath | None | 指定启动driver的JVM时用到的库路径。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性需要用–driver-class-path设置 |
driver行为 | spark.driver.userClassPathFirst | false | 当在driver中加载类时,是否用户添加的jar比Spark自己的jar优先级高。 |
executor行为 | spark.executor.memory | 1G | 每个executor进程使用的内存数 |
executor行为 | spark.executor.memoryOverhead | executorMemory * 0.10, with minimum of 384 | xecutor JVM堆外内存设置,用于解决JVM开销,内部字符串,其他本机开销等问题 |
executor行为 | spark.executor.extraClassPath | None | 附加到executors的classpath的额外的classpath实体,一般不用 |
executor行为 | spark.executor.defaultJavaOptions | None | 默认的JVM选项,以附加到spark.executor.extraJavaOptions |
executor行为 | spark.executor.extraJavaOptions | None | 传递给executors的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性需要用SparkConf对象或者spark-submit脚本用到的spark-defaults.conf文件设置。堆内存可以通过spark.executor.memory设置 |
executor行为 | spark.executor.extraLibraryPath | None | 指定启动executor的JVM时用到的库路径 |
executor行为 | spark.executor.userClassPathFirst | false | 与spark.driver.userClassPathFirst相同的功能 |
executor行为 | spark.executor.cores | 1 | 每个executor使用的核数 |
executor行为 | spark.default.parallelism | 本地模式:机器核数;Mesos:8;其他:max(executor的core,2) | 默认并行度 |
shuffle行为 | spark.reducer.maxSizeInFlight | 48m | 从每个reduce中获取的最大容量,该参数值如果过低时,会导致Shuffle过程中产生的数据溢出到磁盘 |
shuffle行为 | spark.reducer.maxReqsInFlight | Int.MaxValue | 此配置限制了获取块的远程请求的数量 |
shuffle行为 | spark.reducer.maxBlocksInFlightPerAddress | Int.MaxValue | 该配置限制了reduce任务从其他机器获取远程块的数量 |
shuffle行为 | spark.shuffle.compress | true | 是否压缩map操作的输出文件 |
shuffle行为 | spark.shuffle.file.buffer | 32k | 每个shuffle文件输出缓存的大小 |
shuffle行为 | spark.shuffle.io.maxRetries | 3 | (Netty only)自动重试次数 |
shuffle行为 | spark.shuffle.io.numConnectionsPerPeer | 1 | (Netty only)机器之间的连接复用 |
shuffle行为 | spark.shuffle.io.preferDirectBufs | true | (Netty only)直接堆外内存,用于减少随机和高速缓存块传输期间的GC |
shuffle行为 | spark.shuffle.io.retryWait | 5s | (Netty only)重试提取之间要等待多长时间;默认情况下重试导致的最大延迟为15s |
shuffle行为 | spark.shuffle.service.enabled | false | 启用外部shuffle服务 |
shuffle行为 | spark.shuffle.service.index.cache.size | 100m | 缓存条目限制为指定的内存占用,以字节为单位 |
shuffle行为 | spark.shuffle.sort.bypassMergeThreshold | 200 | 如果shuffle map task的数量小于这个阀值200,且不是聚合类的shuffle算子(比如reduceByKey),则不会进行排序 |
shuffle行为 | spark.shuffle.spill.compress | true | 在shuffle时,是否将spilling的数据压缩。压缩算法通过spark.io.compression.codec指定 |
shuffle行为 | spark.shuffle.accurateBlockThreshold | 100 * 1024 * 1024 | 高于该阈值时,HighlyCompressedMapStatus中的混洗块的大小将被准确记录。通过避免在获取随机块时低估随机块的大小,有助于防止OOM |
shuffle行为 | spark.shuffle.registration.timeout | 5000 | 注册到外部shuffle服务的超时时间 |
shuffle行为 | spark.shuffle.registration.maxAttempts | 3 | 注册到外部shuffle服务的重试次数 |
压缩序列化 | spark.broadcast.compress | true | 是否压缩广播变量 |
压缩序列化 | spark.checkpoint.compress | false | 是否开启RDD压缩checkpoint |
压缩序列化 | spark.io.compression.codec | lz4 | RDD压缩方式org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec, and org.apache.spark.io.ZStdCompressionCodec. |
压缩序列化 | spark.io.compression.lz4.blockSize | 32k | LZ4压缩中使用的块大小 |
压缩序列化 | spark.io.compression.snappy.blockSize | 32k | Snappy压缩中使用的块大小 |
压缩序列化 | spark.kryo.classesToRegister | None | 如果你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类 |
压缩序列化 | spark.kryo.registrator | None | 如果你用Kryo序列化,设置这个类去注册你的自定义类。如果你需要用自定义的方式注册你的类,那么这个属性是有用的。否则spark.kryo.classesToRegister会更简单。它应该设置一个继承自KryoRegistrator的类 |
压缩序列化 | spark.kryo.registrationRequired | false | 是否需要注册为Kyro可用 |
压缩序列化 | spark.kryoserializer.buffer.max | 64m | Kryo序列化缓存允许的最大值 |
压缩序列化 | spark.kryoserializer.buffer | 64k | Kyro序列化缓存的大小 |
压缩序列化 | spark.rdd.compress | False | 是否压缩序列化的RDD分区 |
压缩序列化 | spark.serializer | org.apache.spark.serializer.JavaSerializer | 序列化对象使用的类 |
动态分配 | spark.dynamicAllocation.enabled | false | 是否开启动态分配 |
动态分配 | spark.dynamicAllocation.executorIdleTimeout | 60s | 当某个executor空间超过该值时,则会remove掉该executor |
动态分配 | spark.dynamicAllocation.cachedExecutorIdleTimeout | infinity | 当executor内有缓存数据并且空闲了该值后,则remove掉该executor |
动态分配 | spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | 初始executor数量,默认和executor数量一样 |
动态分配 | spark.dynamicAllocation.maxExecutors | infinity | executor上限,默认无限制 |
动态分配 | spark.dynamicAllocation.minExecutors | 0 | executor下限,默认是0个 |
动态分配 | spark.dynamicAllocation.executorAllocationRatio | 1 | 默认情况下,动态分配将要求足够的执行者根据要处理的任务数量最大化并行性。虽然这可以最大程度地减少作业的等待时间,但是对于小型任务,此设置可能会由于执行程序分配开销而浪费大量资源,因为某些执行程序甚至可能无法执行任何工作。此设置允许设置一个比率,该比率将用于减少执行程序的数量。完全并行。默认为1.0以提供最大的并行度。0.5将执行者的目标数量除以2由dynamicAllocation计算的执行者的目标数量仍然可以被spark.dynamicAllocation.minExecutors和spark.dynamicAllocation.maxExecutors设置覆盖 |
动态分配 | spark.dynamicAllocation.schedulerBacklogTimeout | 1 | 如果启用了动态分配,并且有待解决的任务积压的时间超过了此期限,则将请求新的执行者。 |
动态分配 | spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | schedulerBacklogTimeout | 与spark.dynamicAllocation.schedulerBacklogTimeout相同,但仅用于后续执行程序请求 |
动态分配 | spark.dynamicAllocation.shuffleTracking.enabled | false | 实验功能。为执行程序启用随机文件跟踪,从而无需外部随机服务即可动态分配。此选项将尝试保持为活动作业存储随机数据的执行程序 |
动态分配 | spark.dynamicAllocation.shuffleTracking.timeout | infinity | 启用随机跟踪时,控制保存随机数据的执行程序的超时。默认值意味着Spark将依靠垃圾回收中的shuffle来释放执行程序。 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。