赞
踩
在Flink SQL中用ROW_NUMBER
进行去重操作。
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr_col [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
说明
ROW_NUMBER()
: 给分区内的每行数据分配一个唯一的序号(从1开始)。
PARTITION BY col1[, col2...]
: 指定分区列,即去重的Keys。
ORDER BY time_attr_col [asc|desc])
: 指定排序列。时间属性目前仅支持proctime,暂不支持rowtime。按asc
排序保留第一条,按desc
排序保留最后一条。
注意:
需要两层Query:
内层查询使用ROW_NUMBER() OVER
窗口函数对分区内(通过PARTITION BY
指定)的数据根据排序列(通过ORDER BY
指定)标上排名(rownum
)。
外层查询对排名取第一个。
外层查询WHERE
条件中,必须通过如rownum = 1
指定,Flink才能将其识别为去重操作。
会产生Retraction。
仅支持Blink Planner。
//自己造的测试数据,如下:
//某个用户在某个时刻点击了某个商品
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_1"}
package com.bigdata.flink.deduplication; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * Author: Wang Pei * Summary: * RowNumber Deduplication */ public class RowNumberDeduplication { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); // Source DDL String sourceDDL = "" + "create table source_kafka " + "( " + " userID String, " + " eventType String, " + " eventTime String, " + " productID String, " + " proctime AS PROCTIME() " + ") with ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = '0.10', " + " 'connector.properties.bootstrap.servers' = 'kafka01:9092', " + " 'connector.properties.zookeeper.connect' = 'kafka01:2181', " + " 'connector.topic' = 'test_1', " + " 'connector.properties.group.id' = 'c1_test_1', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + ")"; tableEnv.sqlUpdate(sourceDDL); //tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print(); // Deduplication Query // 保留最后一条 String execSQL = "" + "SELECT userID, eventType, eventTime, productID " + "FROM ( " + " SELECT *, " + " ROW_NUMBER() OVER (PARTITION BY userID, eventType, eventTime ORDER BY proctime DESC) AS rownum " + " FROM source_kafka " + ") t " + "WHERE rownum = 1"; Table resultTable = tableEnv.sqlQuery(execSQL); tableEnv.toRetractStream(resultTable, Row.class).print(); tableEnv.execute(RowNumberDeduplication.class.getSimpleName()); } }
// Kafka中逐条输入如下数据
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_1"}
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_2"}
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_3"}
// 输出
(true,user_1,click,2015-01-01 01:00:00,product_1)
(false,user_1,click,2015-01-01 01:00:00,product_1)
(true,user_1,click,2015-01-01 01:00:00,product_2)
(false,user_1,click,2015-01-01 01:00:00,product_2)
(true,user_1,click,2015-01-01 01:00:00,product_3)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。