当前位置:   article > 正文

Flink Table & SQL ROW_NUMBER Deduplication_flink sql row_number

flink sql row_number

在Flink SQL中用ROW_NUMBER进行去重操作。

语法

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr_col [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

说明

  • ROW_NUMBER(): 给分区内的每行数据分配一个唯一的序号(从1开始)。

  • PARTITION BY col1[, col2...]: 指定分区列,即去重的Keys。

  • ORDER BY time_attr_col [asc|desc]): 指定排序列。时间属性目前仅支持proctime,暂不支持rowtime。按asc排序保留第一条,按desc排序保留最后一条。

注意:

  1. 需要两层Query:

    • 内层查询使用ROW_NUMBER() OVER窗口函数对分区内(通过PARTITION BY指定)的数据根据排序列(通过ORDER BY指定)标上排名(rownum)。

    • 外层查询对排名取第一个。

  2. 外层查询WHERE条件中,必须通过如rownum = 1指定,Flink才能将其识别为去重操作。

  3. 会产生Retraction。

  4. 仅支持Blink Planner。

用ROW_NUMBER去重

测试数据

//自己造的测试数据,如下:
//某个用户在某个时刻点击了某个商品
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_1"}
  • 1
  • 2
  • 3

代码示例

package com.bigdata.flink.deduplication;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author: Wang Pei
 * Summary:
 * RowNumber Deduplication
 */
public class RowNumberDeduplication {
    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // Source DDL
        String sourceDDL = ""
                + "create table source_kafka "
                + "( "
                + "    userID String, "
                + "    eventType String, "
                + "    eventTime String, "
                + "    productID String, "
                + "    proctime AS PROCTIME() "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.10', "
                + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                + "    'connector.topic' = 'test_1', "
                + "    'connector.properties.group.id' = 'c1_test_1', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);
        //tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();

        // Deduplication Query
        // 保留最后一条
        String execSQL = ""
                + "SELECT userID, eventType, eventTime, productID  "
                + "FROM ( "
                + "  SELECT *, "
                + "     ROW_NUMBER() OVER (PARTITION BY userID, eventType, eventTime ORDER BY proctime DESC) AS rownum "
                + "  FROM source_kafka "
                + ") t "
                + "WHERE rownum = 1";
        Table resultTable = tableEnv.sqlQuery(execSQL);
        tableEnv.toRetractStream(resultTable, Row.class).print();

        tableEnv.execute(RowNumberDeduplication.class.getSimpleName());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

结果查看

// Kafka中逐条输入如下数据
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_1"}
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_2"}
{"userID":"user_1","eventType":"click","eventTime":"2015-01-01 01:00:00","productID":"product_3"}

// 输出
(true,user_1,click,2015-01-01 01:00:00,product_1)
(false,user_1,click,2015-01-01 01:00:00,product_1)
(true,user_1,click,2015-01-01 01:00:00,product_2)
(false,user_1,click,2015-01-01 01:00:00,product_2)
(true,user_1,click,2015-01-01 01:00:00,product_3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/533468
推荐阅读
相关标签
  

闽ICP备14008679号