赞
踩
本文基于SPARK 3.3.0
该优化是FaceBook(Meta) 内部的优化,还有合并到spark社区。
该优化的主要是partialaggregate的部分:对于类似求count,sum,Avg的聚合操作,会存在现在mapper进行部分聚合的操作,之后在reduce端,再进行FinalAggregate操作。这看起来是没有问题的(能够很好的减少网络IO),但是我们知道对于聚合操作,我们会进行数据的spill的操作,如果在mapper阶段合并的数据很少,以至于抵消不了网络IO带来的消耗的话,这无疑会给任务带来损耗。
利用运行时的指标信息,能够达到比较好的加速效果。
对于ObjectHashAggreate的原理,可以参考深入理解SPARK SQL 中HashAggregateExec和ObjectHashAggregateExec以及UnsafeRow,该文可以比较清楚的解释ObjectHashAggregate和HashAggregate的区别:
使用jvm heap的内存使用情况以及处理的行数来指导什么时候开始spill。
但是这种在数据倾斜的情况下,会增加OOM的风险。
目前SortAggreaget的现状是:
目前在spark 3.3.0增加的功能:
ReplaceHashWithSortAgg
来做替换,当然通过spark.sql.execution.replaceHashWithSortAgg
来开启(默认是关闭的),因为对于任何新特性,在release版本默认都是关闭的,在master分支才是开启的对于Aggregate更多的细节了解可以参考sparksql源码系列 | 一文搞懂with one count distinct 执行原理
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。