当前位置:   article > 正文

Flink流批一体计算(24):Flink SQL之mysql维表实时关联_flink如何读取mysql多个表关联度的数据

flink如何读取mysql多个表关联度的数据

目录

1.维表

2.数据准备

创建源数据

创建维度表

创建Sink表

3.配置任务

Flink SQL创建kafka源表

Flink SQL创建MySQL维表

Flink SQL创建MySQL结果表

编写计算任务

核验数据


1.维表

目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。

本案例采用MySQL创建维表,与创建MySQL sink表语法相同。

2.数据准备

创建源数据

重启kafka,创建Topic:  case_kafka_mysql

写入json格式的数据

  {"ts": "20201011","id": 8,"price_amt":211}

创建维度表

在MySQL中创建名为product_dim的表

  1. CREATE TABLE `product_dim` (
  2.   `id` bigint(11) NOT NULL,
  3.   `coupon_price_amt` bigint(11) DEFAULT NULL,
  4.   PRIMARY KEY (`id`)
  5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向数据表插入如下数据:

  1. INSERT INTO `product_dim` VALUES (1, 1);
  2. INSERT INTO `product_dim` VALUES (3, 1);
  3. INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表

在MySQL中创建名为sync_test_3的表

  1. CREATE TABLE `sync_test_3` (
  2.   `id` bigint(11) NOT NULL AUTO_INCREMENT,
  3.   `ts` varchar(64) DEFAULT NULL,
  4.   `total_gmv` bigint(11) DEFAULT NULL,
  5.   PRIMARY KEY (`id`),
  6.   UNIQUE KEY `uidx` (`ts`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任务

Flink SQL创建kafka源表
  1. create table flink_test_3 (
  2.   id BIGINT,
  3.   ts VARCHAR,
  4.   price_amt BIGINT,
  5.   proctime AS PROCTIME ()
  6. )
  7.  with (
  8.     'connector' = 'kafka',
  9.     'topic' = 'case_kafka_mysql',
  10.     'properties.bootstrap.servers' = '127.0.0.1:9092',
  11.     'properties.group.id' = 'flink_gp_test3',
  12.     'scan.startup.mode' = 'earliest-offset',
  13.     'format' = 'json',
  14.     'json.fail-on-missing-field' = 'false',
  15.     'json.ignore-parse-errors' = 'true',
  16.     'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
  17.   );
Flink SQL创建MySQL维表
  1. create table flink_test_3_dim (
  2.   id BIGINT,
  3.   coupon_price_amt BIGINT
  4. )
  5. WITH (
  6.    'connector' = 'jdbc',
  7.    'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
  8.    'table-name' = 'product_dim',
  9.    'username' = 'root',
  10.    'password' = 'Admin',
  11.    'lookup.max-retries' = '3',
  12.    'lookup.cache.max-rows' = 1000
  13.  );

WITH参数

参数

说明

类型

备注

lookup.cache.max-rows

指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

Integer

默认情况下,维表Cache是未开启的。

lookup.cache.ttl

指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

Duration

默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

lookup.cache.caching-missing-key

是否缓存空的查询结果。

Boolean

参数取值如下:

true(默认值):缓存空的查询结果。

false:不缓存空的查询结果。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

默认值为3

Flink SQL创建MySQL结果表
  1. CREATE TABLE sync_test_3 (
  2.                    ts string,
  3.                    total_gmv bigint,
  4.                    PRIMARY KEY (ts) NOT ENFORCED
  5.  ) WITH (
  6.    'connector' = 'jdbc',
  7.    'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
  8.    'table-name' = 'sync_test_3',
  9.    'username' = 'root',
  10.    'password' = 'Admin'
  11.  );
编写计算任务
  1. INSERT INTO sync_test_3
  2. SELECT
  3.   ts,
  4.   SUM(price_amt - coupon_price_amt) AS total_gmv
  5. FROM
  6.   (
  7.     SELECT
  8.       a.ts as ts,
  9.       a.price_amt as price_amt,
  10.       b.coupon_price_amt as coupon_price_amt
  11.     FROM
  12.       flink_test_3 as a
  13.       LEFT JOIN flink_test_3_dim  FOR SYSTEM_TIME AS OF  a.proctime  as b
  14.      ON b.id = a.id
  15.   )
  16. GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/598320
推荐阅读
相关标签
  

闽ICP备14008679号