赞
踩
尽量少使用对象,因为每个对象都有对象头、引用等额外信息,很占用内存空间.
尽量少使用HashMap, LinkedList等,这些类型会使用一些内部类封装集合元素,如: Map.Entry.
尽量少使用字符串,虽然字符串的运行很快,但是它的内部都有字符数组及长度等额外信息,所以也是比较消耗内存的.
优先使用原始数据类型来替代字符串,如:Int,Long等.
代码中如果有String拼接成Array[String]等操作, 建议优先使用aggregateByKey算子,此算子更灵活且更加节省内存
- 不建议使用reduceByKey的原因也是为了减少内存使用, reduceByKey只能对相同数据结构进行聚合, 即数据结构需要转为Array[String]
- 如果使用reduceByKey则需要在mapPartitions处将每一条数据的结构都改为Array[String],增加了内存的消耗和GC次数.
- 使用aggragateByKey更加灵活, 先在预聚合时将String转为Array[String], 降低内存使用.
代码详见: 浅谈Spark groupBy、reduceByKey与aggregateByKey,解决频繁Full GC问题
repartition + sortByKey进行了两次Shuffle,效率较慢,而repartitionAndSortWithinPartitions算子只进行了一次Shuffle,相比上面的操作要快上许多,但是之前搜网上实例很少,这个算子也是琢磨了很久,算是spark中较为复杂的算子了.
代码详见:浅谈Spark repartitionAndSortWithinPartitions
这种情况发生的原因如果是每个partition数据量不均匀导致的, 可通过repartition操作来解决.
如果前一步就有hashpartition等操作,可以避免repartition,只需要在分区修改重分区的逻辑即可.
代码示例可参考: 记录解决HashMap与HashPartition中数据量过大发生Hash冲突问题
3. 当需要连接mysql , redis等数据库时, 优先使用mapPartitions,只需在每个分区中连接一次,即创建一次数据库连接,避免了像map操作每条数据都反复连接数据库的情况.也减少了数据库的压力
4. 如果是最简单的key,value对换等操作,不建议使用mapPartitions算子,这样反而增加了计算量.直接使用map算子即可
和mapPartitions替代map类似,不同的是,foreachPartitiions 和 foreach是没有返回值的,但是mapPartitions 和 map是有返回值的
在Join之前,将不需要的数据剔除,否则只会增加shuffle数据量
如果两个大表进行Join,其中小一点的表不超过1G,可以将这个小一点的表BroadCast出去,将reduceJoin转换为mapJoin,避免了Shuffle,提升Join效率.
如果小表Join大表,同样可直接将小表broadCast出去,避免了Shuffle.
广播实际上是从Driver或者其他Executor节点上远程拉取一份数据放到本地Executor内存中。
这样的话每个Executor内存中就只会保留一份广播变量副本,后面就不用再走Shuffle
val accSkewBroadCast: Broadcast[Dataset[Row]] = sparkSession.sparkContext.broadcast(accSkewDf)
val broadCastValue = accSkewBroadCast.value
如果想将表broadCast出去,可尝试下面这种broadcast方式:
broadCastValue.createOrReplaceTempView("broadcast_table")
sparkSession.catalog.cacheTable("broadcast_table")
在进行Join或者取数据时,底层会将between and操作转化为>= & <=,然后再进行过滤,直接使用>= & <=可以提升一丢丢效率.
如果觉得数据发生了倾斜(数据分布不均,很多数据都打到了一个分区中),建议在Join之前先进行Repartition操作,并且增加分区数,这样可以将分区内的数据重新打散,避免了某个分区内数据过多导致Task卡很久的情况.
如果发现发生数据倾斜的key对业务影响并不是很大,可以考虑直接将这些key过滤掉,这样就避免了数据倾斜.但是通常情况下并不允许这样操作.
最简单粗暴的方式,在我们使用shuffle算子的时候,设置一下并行度.增加并行度就将每个分区内的数据分发到了多个分区中,减小了倾斜数据分区的压力,速度运行速度就会增加.
通常用来处理reduceByKey或者sql中group by进行分组聚合情况
这种一般是针对聚合类shuffle操作
上面有提到,此处不再赘述
假设: leftRDD Left Join rightRDD
df.select("key","view_name", "view_value", "valid_feaids")
// 数据采样, 0.1代表采样10%,可以自定义
.sample(false, 0.1)
.rdd
.map(k => (k, 1))
// 统计 key 出现的次数
.reduceBykey(_ + _)
// 过滤出数量大于指定数量的Key, 可以自定义
.filter(_._2 >= 20000)
// 根据 key 出现次数进行排序
.map(k => (k._2, k._1))
// false是倒排, true是顺序排
.sortByKey(false)
// 取前 N 个
.take(1000)
每个spark任务申请多少executor进行来执行.这个需要看资源情况,如果资源足够,且任务数据量较大,可多设置一些,反之酌情减少.
num-executors: 100
每个executor进程的内存,直接决定了我们spark任务的速率,如果平时代码中出现OOM,则需要看下executor memory是否设置的过小.
executor-memory: 10G
设置每个executor进程的cpu core数量.每一个core同一时间只能执行一个task进程,这个参数设置的越多,那么就能更快地执行完所有任务.一般建议2-3个即可.
executor-cores
driver端进程的内存,默认值为1g,通常不需要设置就足够.如果使用collect算子时,则需要driver端内存足够大,否则会出现OOM,此时建议调大内存.
driver-memory
Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%,可适当提高shuffle计算内存比例,如调整到70%
"spark.shuffle.memoryFraction": "0.7"
spark任务重RDD持久化数据在Executor内存中占用比例为60%,当数据量较大内存放不下时,就会溢写到磁盘,如果spark任务中有较多需要持久化的RDD,建议调大此参数,避免内存不足时数据只能写磁盘的情况.若没有或者发现作业频繁gc或者运行较慢时,则可适当调小此比例
"spark.storage.memoryFraction": "0.3"
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:ParallelGCThreads=3"
当数据量过大时,executor负载压力比较大,通信有时候会出现问题.会有以下问题
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
Executor heartbeat timed out after xxx ms
因为spark中默认交互时间为120s,经常会报错,此时就需要提高网络交互时间.
"spark.executor.heartbeatInterval": "3000000"
"spark.network.timeout": "1200000"
"spark.storage.blockManagerSlaveTimeoutMs": "10000000"
spark中stage失败后重试次数,默认值为3,可以适当增加此次数.避免由于FULL GC、网络不稳定等情况下造成拉取数据失败的问题
spark.shuffle.io.maxRetries
默认失败后重试等待时间是: 5s,可以设置为60s.增加shuffle稳定性
spark.shuffle.io.retryWait
这个参数很重要,不设置的话一般会影响到任务性能.
这个一般根据数据量定,如果数据量过大,建议设置多一些,如5000. 如果数据量不大,可适当减少,如500~1000.
一般建议设置该参数为num-executors * executor-cores的2~3倍较为合适
如: executor为:200, 每个executor core为2个, 则task数设置为1000是合理的.
spark.default.parallelism: 5000
尽量减少select的次数,如果能够一次解决,就不要select多次,每次select都会遍历一次子集数据,非常消耗资源与内存.
必要时,建议使用SQL方式来替代dataframe操作,看起来更加直观,很多时候也避免了dataframe的各种复杂操作.
在使用distinct时候,经常会出现卡死的情况,因为distinct只需要找到不同的值,它会读取所有的数据记录,然后使用一个全局的reduce任务来去重,极容易造成数据倾斜.
而group by有分组聚合运算等操作.做的远比distinct要多,会有多个reduce任务并行处理,每一个reduce都处理一部分数据,然后进行聚合操作.效率远比distinct要高.
distinct:
select count(distinct a.uid) uv,name,age
from A
group by name,age
group by:
select count(uid) uv,name,age
from (
select uid,name,age
from A
group by uid,name,age
) a
group by name,age
如果代码中重复使用到了某个RDD或DataFrame,可将其cache下来,否则每次使用这个RDD / DataFrame时, Spark都会每次从头计算这个RDD / DataFrame,非常消耗资源且浪费时间.
cache以后必须紧接着action算子(count等操作),否则cache无效.然后再接着其他操作
val cachedDF = testDF.cache()
cachedDF.count()
cachedDF.join(xxx)
cachedDF.map(xxx)
以上操作的话就使用了cache的DataFrame,不必每次从头计算testDF
accDf.unpersist(true)
accRDD.unpersist(true)
sparkSession.catalog.clearCache()
IOUtils.unpersistRdds(context.spark.sparkContext)
def unpersistRdds(sc: SparkContext): Unit = {
// 获得所有持久化的RDD,并进行指定释放
val rdds = sc.getPersistentRDDs
rdds.filter(_._2.name != null)
.filter(_._2.name.contains("rdd"))
.foreach(_._2.unpersist())
}
context.spark.catalog.dropTempView("temp")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。