当前位置:   article > 正文

flink1.13 sql基础语法(二)join操作_flink look up join

flink look up join

(7) DML 之 join 操作

分类:
⭐ Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join
⭐ Interval Join:流与流的 Join,两条流一段时间区间内的 Join
⭐ Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
⭐ Lookup Join:流与外部维表的 Join
⭐ Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
⭐ Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

1、Regular Join

流与流的 Join,包括 Inner Equal Join、Outer Equal Join

⭐ Inner Join(Inner Equal Join):流任务中,只有两条流 Join 到才输出,输出 +[L, R]
⭐ Left Join(Outer Equal Join):流任务中,左流数据到达之后,无论有没有 Join 到右流的数据,都会输出(Join 到输出 +[L, R],没 Join 到输出 +[L, null]),
如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null],然后输出 +[L, R]
⭐ Right Join(Outer Equal Join):有 Left Join 一样,左表和右表的执行逻辑完全相反
⭐ Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出
(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。
如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流
(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。

案例:曝光日志关联点击日志筛选既有曝光又有点击的数据,并且补充点击的扩展参数(show inner click)

-- 曝光日志数据
CREATE TABLE show_log_table (
    log_id BIGINT,
    show_params STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '2',
  'fields.show_params.length' = '1',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '100'
);

-- 点击日志数据
CREATE TABLE click_log_table (
  log_id BIGINT,
  click_params     STRING
)
WITH (
  'connector' = 'datagen',
  'rows-per-second' = '2',
  'fields.click_params.length' = '1',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);

CREATE TABLE sink_table (
    s_id BIGINT,
    s_params STRING,
    c_id BIGINT,
    c_params STRING
) WITH (
  'connector' = 'print'
);


-- 流的 INNER JOIN,条件为 log_id
INSERT INTO sink_table
SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
关于 Regular Join 的注意事项:

⭐ 实时 Regular Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;
非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联

⭐ Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。

⭐ 流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
(2) Interval Join

Interval Join:流与流的 Join,两条流一段时间区间内的 Join。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据

Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。 Interval Join 就是用于消灭回撤流的。

⭐ Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

⭐ Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
⭐ Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,
则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,
然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。

⭐ Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反

⭐ Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),
如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。
事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

实际案例:曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):

INSERT INTO sink_table
SELECT
    show_log_table.log_id as s_id,
    show_log_table.show_params as s_params,
    click_log_table.log_id as c_id,
    click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
关于 Interval Join 的注意事项:

⭐ 实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;
非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出
  • 1
  • 2
  • 3
  • 4
(3) Temporal Join(快照 Join)

Temporal Join 中,Versioned Table 其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,
可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进行 join。

比如常见的汇率数据(实时的根据汇率计算总金额),在 12:00 之前(事件时间),人民币和美元汇率是 7:1,在 12:00 之后变为 6:1,那么在 12:00 之前数据就要按照 7:1 进行计算,
12:00 之后就要按照 6:1 计算。

在事件时间语义的任务中,事件时间 12:00 之前的数据,要按照 7:1 进行计算,12:00 之后的数据,要按照 6:1 进行计算。这其实就是离线中快照的概念,
维护具体汇率的表在 Flink SQL 体系中就叫做 Versioned Table。

Verisoned Table:Verisoned Table 中存储的数据通常是来源于 CDC 或者会发生更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。

PRIMARY KEY 定义方式:

-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    -- PRIMARY KEY 定义方式
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
(4) Lookup Join(维表 Join)

上面说的这几种 Join 都是流与流之间的 Join,而 Lookup Join 是流与 Redis,Mysql,HBase 这种存储介质的 Join。Lookup 的意思就是实时查找,
而实时的画像数据一般都是存储在 Redis,Mysql,HBase 中,这就是 Lookup Join 的由来

实际案例:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。

-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT 
    s.log_id as log_id
    , s.`timestamp` as `timestamp`
    , s.user_id as user_id
    , s.proctime as proctime
    , u.sex as sex
    , u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注意:
实时的 lookup 维表关联能使用 处理时间 去做关联

所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。

那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,常用的优化方案有哪些?这里列举一下:

⭐1、 按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。
这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。

⭐2、异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。
这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps。

⭐3、 批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。
举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。
那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。
这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。

最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。

(5)Array Expansion(数组列转行)

将表中 ARRAY 类型字段(列)拍平,转为多行
实际案例:比如某些场景下,日志是合并、攒批上报的,就可以使用这种方式将一个 Array 转为多行。

CREATE TABLE show_log_table (
    log_id BIGINT,
    show_params ARRAY<STRING>
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.log_id.min' = '1',
  'fields.log_id.max' = '10'
);

CREATE TABLE sink_table (
    log_id BIGINT,
    show_param STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT
    log_id,
    t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

(6) Table Function(自定义列转行)

Table Function 本质上是个 UDTF 函数,和离线 Hive SQL 一样,我们可以自定义 UDTF 去决定列转行的逻辑

Table Function 使用分类:
⭐ Inner Join Table Function:如果 UDTF 返回结果为空,则相当于 1 行转为 0 行,这行数据直接被丢弃
⭐ Left Join Table Function:如果 UDTF 返回结果为空,折行数据不会被丢弃,只会在结果中填充 null 值

/**
 * 自定义列转行
 */
public class _07_TableFunctionInnerJoin {

    public static void main(String[] args) throws Exception {

        StreamTableEnvironment tabEnv = FlinkSqlUtils.getTabEnv();

        String sql = "CREATE FUNCTION user_profile_table_func AS 'com.yyds.flink_sql.ddl_create"
                + "._07_TableFunctionInnerJoin$UserProfileTableFunction';\n"
                + "\n"
                + "CREATE TABLE source_table (\n"
                + "    user_id BIGINT NOT NULL,\n"
                + "    name STRING,\n"
                + "    row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"
                + "    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
                + ") WITH (\n"
                + "  'connector' = 'datagen',\n"
                + "  'rows-per-second' = '10',\n"
                + "  'fields.name.length' = '1',\n"
                + "  'fields.user_id.min' = '1',\n"
                + "  'fields.user_id.max' = '10'\n"
                + ");\n"
                + "\n"
                + "CREATE TABLE sink_table (\n"
                + "    user_id BIGINT,\n"
                + "    name STRING,\n"
                + "    age INT,\n"
                + "    row_time TIMESTAMP(3)\n"
                + ") WITH (\n"
                + "  'connector' = 'print'\n"
                + ");\n"
                + "\n"
                + "INSERT INTO sink_table\n"
                + "SELECT user_id,\n"
                + "       name,\n"
                + "       age,\n"
                + "       row_time\n"
                + "FROM source_table,\n"
                // Table Function Join 语法对应 LATERAL TABLE
                + "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";

        Arrays.stream(sql.split(";"))
                .forEach(tabEnv::executeSql);
    }

    public static class UserProfileTableFunction extends TableFunction<Integer> {

        public void eval(long userId) {
            // 自定义输出逻辑
            if (userId <= 5) {
                // 一行转 1 行
                collect(1);
            } else {
                // 一行转 3 行
                collect(1);
                collect(2);
                collect(3);
            }
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/474219
推荐阅读
相关标签
  

闽ICP备14008679号