使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作,BE内存溢出问题频发(忘记当时指定的BE内存上限是多少了.....),此时才意识到,开窗操作,如果使用 不当,反而更容易引发性能问题。 下文是对Hive中的窗口函数底层源码进行初步学习,若有问题,请指正~
由于窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),因此Hive社区引入分区表函数 Partitioned Table Function(PTF)。
当遇到窗口函数时,会生成PTFOperator,PTFOperator依赖PTFInvocation 读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;
WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。
重写process(Object row, int tag) 方法,该方法来处理一行数据Row
- @Override
- public void process(Object row, int tag) throws HiveException {
- if (!isMapOperator) {
- /*
- * check if current row belongs to the current accumulated Partition:
- * - If not:
- * - process the current Partition
- * - reset input Partition
- * - set currentKey to the newKey if it is null or has changed.
- */
- newKeys.getNewKey(row, inputObjInspectors[0]);
- //会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)
- boolean keysAreEqual = (currentKeys != null && newKeys != null) ?
- newKeys.equals(currentKeys) : false;
- // 如果不相等,就结束当前partition分区的数据累积,触发窗口计算
- if (currentKeys != null && !keysAreEqual) {
- // 关闭正在积累的分区
- ptfInvocation.finishPartition();
- }
- // 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeys
- if (currentKeys == null || !keysAreEqual) {
- // 开启一个新的分区partition
- ptfInvocation.startPartition();
- if (currentKeys == null) {
- currentKeys = newKeys.copyKey();
- } else {
- currentKeys.copyKey(newKeys);
- }
- }
- } else if (firstMapRow) { // 说明当前row是进入的第一行
- ptfInvocation.startPartition();
- firstMapRow = false;
- }
- // 将数据row添加到分区中,积累数据
- ptfInvocation.processRow(row);
- }

PTFInvocation是PTFOperator类 的内部类
- @Override
- protected void initializeOp(Configuration jobConf) throws HiveException {
- ...
- ptfInvocation = setupChain();
- ptfInvocation.initializeStreaming(jobConf, isMapOperator);
- ...
- }
它的主要作用是负责PTF 数据链中行( row)的流动,通过 ptfInvocation.processRow(row) 方法调用传递链中的每一行,并且通过ptfInvocation.startPartition()、ptfInvocation.finishPartition()方法来通知分区何时开始何时结束。
- PTFPartition inputPart; // inputPart理解为:分区对象,一直是在复用一个inputPart
- TableFunctionEvaluator tabFn; // tabFn理解为:窗口函数的实例
- //向分区中添加一行数据
- void processRow(Object row) throws HiveException {
- if (isStreaming()) {
- // tabFn是窗口函数的实例
- handleOutputRows(tabFn.processRow(row));
- } else {
- // inputPart就是当前正在累积数据的分区
- inputPart.append(row);
- }
- }
- // 开启一个分区
- void startPartition() throws HiveException {
- if (isStreaming()) {
- tabFn.startPartition();
- } else {
- if (prev == null || prev.isOutputIterator()) {
- if (inputPart == null) {
- // 创建新分区对象:PTFPartition对象
- createInputPartition();
- } else {
- // 重置分区
- inputPart.reset();
- }
- }
- }
- if (next != null) {
- next.startPartition();
- }
- }
- // 关闭一个分区
- void finishPartition() throws HiveException {
- if (isStreaming()) {
- handleOutputRows(tabFn.finishPartition());
- } else {
- if (tabFn.canIterateOutput()) {
- outputPartRowsItr = inputPart == null ? null :
- tabFn.iterator(inputPart.iterator());
- } else {
- // tabFn是窗口函数的实例,execute方法:执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象
- outputPart = inputPart == null ? null : tabFn.execute(inputPart);
- outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
- }
- if (next != null) {
- if (!next.isStreaming() && !isOutputIterator()) {
- next.inputPart = outputPart;
- } else {
- if (outputPartRowsItr != null) {
- while (outputPartRowsItr.hasNext()) {
- next.processRow(outputPartRowsItr.next());
- }
- }
- }
- }
- if (next != null) {
- next.finishPartition();
- } else {
- if (!isStreaming()) {
- if (outputPartRowsItr != null) {
- while (outputPartRowsItr.hasNext()) {
- // 将窗口函数计算结果逐条输出到下一个Operator中
- forward(outputPartRowsItr.next(), outputObjInspector);
- }
- }
- }
- }
- }

- private final PTFRowContainer<List<Object>> elems; // 存放数据的容器
- public void append(Object o) throws HiveException {
- //在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值(21亿),会抛异常。
- if (elems.rowCount() == Integer.MAX_VALUE) {
- throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
- Integer.MAX_VALUE));
- }
- @SuppressWarnings("unchecked")
- List<Object> l = (List<Object>)
- ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
- elems.addRow(l);
- }
- public abstract class TableFunctionEvaluator {
- transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化
- // iPart理解为:分区对象
- public PTFPartition execute(PTFPartition iPart)
- throws HiveException {
- if (ptfDesc.isMapSide()) {
- return transformRawInput(iPart);
- }
- PTFPartitionIterator<Object> pItr = iPart.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);
- if (outputPartition == null) {
- outputPartition = PTFPartition.create(ptfDesc.getCfg(),
- tableDef.getOutputShape().getSerde(),
- OI, tableDef.getOutputShape().getOI());
- } else {
- outputPartition.reset();
- }
- // 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartition
- execute(pItr, outputPartition);
- return outputPartition;
- }
- protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
- }

抽象方法 execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction中
- public class WindowingTableFunction extends TableFunctionEvaluator {
- @Override
- public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
- ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
- PTFPartition iPart = pItr.getPartition();
- StructObjectInspector inputOI = iPart.getOutputOI();
- WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
- for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
- // 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之true
- boolean processWindow = processWindow(wFn.getWindowFrame());
- pItr.reset();
- if (!processWindow) {
- Object out = evaluateFunctionOnPartition(wFn, iPart);
- if (!wFn.isPivotResult()) {
- out = new SameList(iPart.size(), out);
- }
- oColumns.add((List<?>) out);
- } else {
- oColumns.add(executeFnwithWindow(wFn, iPart));
- }
- }
- /*
- * Output Columns in the following order
- * - the columns representing the output from Window Fns
- * - the input Rows columns
- */
- for (int i = 0; i < iPart.size(); i++) {
- ArrayList oRow = new ArrayList();
- Object iRow = iPart.getAt(i);
- for (int j = 0; j < oColumns.size(); j++) {
- oRow.add(oColumns.get(j).get(i));
- }
- for (StructField f : inputOI.getAllStructFieldRefs()) {
- oRow.add(inputOI.getStructFieldData(iRow, f));
- }
- //最终将处理好的数据逐条添加到输出PTFPartition中
- outP.append(oRow);
- }
- }
- // Evaluate the function result for each row in the partition
- ArrayList<Object> executeFnwithWindow(
- WindowFunctionDef wFnDef,
- PTFPartition iPart)
- throws HiveException {
- ArrayList<Object> vals = new ArrayList<Object>();
- for (int i = 0; i < iPart.size(); i++) {
- // 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象
- Object out = evaluateWindowFunction(wFnDef, i, iPart);
- vals.add(out);
- }
- return vals;
- }
- // Evaluate the result given a partition and the row number to process
- private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
- throws HiveException {
- BasePartitionEvaluator partitionEval = wFn.getWFnEval()
- .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);
- // 给定当前行,获取窗口的聚合
- return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
- }
- }

注:WindowingTableFunction类中的execute方法 ,没怎么理解清楚,待补充~
window Funtion的使用语法:
- select
- col1,
- col2,
- row_number() over (partition by col1 order by col2 窗口子句) as rn
- from tableA
windows函数部分即是:在窗口上执行的函数。主要有count 、sum、avg聚合类窗口函数、还有常用的row_number、rank这样的排序函数。
即为: over里面的三部分内容(均可省略不写)
partition by 分区
order by 排序
(rows | range )between ... and ..... 窗口子句
ps :Hive 窗口函数的详细介绍:
(07)Hive——窗口函数详解_hive 窗口函数-CSDN博客
窗口函数的实现,主要借助 Partitioned Table Function (即PTF);
- select
- id,
- sq,
- cell_type,
- rank,
- row_number() over(partition by id order by rank ) as rn ,
- rank() over(partition by id order by rank) as r,
- dense_rank() over(partition by cell_type order by id) as dr
- from window_test_table
- group by
- id,
- sq,
- cell_type,
- rank;
计算除窗口函数以外所有的其他运算,如:group by,join ,having等。上面的代码的第一阶段即为:
- select
- id,
- sq,
- cell_type,
- rank
- from window_test_table
- group by
- id,
- sq,
- cell_type,
- rank;
将第一步的输出作为第一个 PTF 的输入,计算对应的窗口函数值。上面代码的第二阶段即为:
- select
- id,
- sq,
- cell_type,
- rank,
- rn,
- r
- from
- window(
- <w>,--将第一阶段输出记为w
- partition by id, --分区
- order by rank, --窗口函数的order
- [rn:row_number(),r:rank()] --窗口函数调用
- )
由于row_number(),rank() 两个函数对应的窗口是相同的(partition by id order by rank),因此,这两个函数可以在一次shuffle中完成。
将第二步的输出结果作为 第二个PTF 的输入,计算对应的窗口函数值。上面代码的第三阶段即为:
- select
- id,
- sq,
- cell_type,
- rank,
- rn,
- r,
- dr
- from
- window(
- <w1>,--将第二阶段输出记为w1
- partition by cell_type, --分区
- order by id, --窗口函数的order
- [dr:dense_rank()] --窗口函数调用
- )
总结:上述代码显示需要shuffle三次才能得到最终的结果(第一阶段的group by ,第二阶段,第三阶段的开窗操作)。对应到MapReduce程序,即需要经历三次 map->reduce组合;对应到spark sql上,需要Exchange三次,再加上中间排序操作,在数据量很大的情况下,效率上确实会有较大的影响。
在使用Hive进行数据处理时,借助窗口函数可以对数据进行分组、排序等操作,但是在使用row_number这类窗口函数时,会遇到性能较慢的问题,j即比普通的聚合函数( sum,min,max等)运行成本更高,为啥?
(1)开窗函数不能做预聚合 ,数据量很多,shuffle慢,计算慢,并且会有
(2)开窗多一步order by ,更耗时间;
(1)普通的聚合函数语句,可以根据函数不同,采用partial + merge 的方式运行,也即是map端预聚合;但那是window 窗口语句只能在reduce 端一次性聚合,即只有complete 执行模式。
(3)window语句作用于 对行,并为每行返回一个聚合结果,这决定了window在执行过程中需要更大的buffer 进行汇总。
例如:假设需要求出历史至今用户粒度末次交易的sku名称或者交易金额等,这种情况下,可以将 交易时间和sku名称拼接起来,取max ,之后再将sku名称拆解开,即能达到预期效果。
在Hive 中,row_number是一个常用的窗口函数,用于为结果集中的每一行分配一个唯一的数字。通常会搭配over子句来指定窗口的范围和排序方式。例如:
- select
- col1,
- col2,
- row_number() over (partition by col1 order by col2 窗口子句) as rn
- from tableA
上述示例row_number 函数将根据col1进行分组,并按照col2的值进行排序,为每一组数据分配一个唯一的行号。然而,在处理大规模数据时,使用row_number可能会导致性能下降,这是因为row_number 需要对数据进行排序和标记,而这些操作在大数据量下会消耗较多的计算资源。
注: 以下都是row_number() over () 开窗函数性能优化的几种方式:
ps: 这种方式在生产环境中用过。
在使用row_number时,尽量避免多次排序操作。可以将row_number 函数应用在子查询中,然后再进行排序操作,避免重复的排序过程。
- select
- col1,
- col2,
- rn
- from
- ( select
- col1,
- col2,
- row_number() over (partition by col1 order by col2) as rn
- from tableA) tmp1
- order by col1,col2;
