赞
踩
目录
目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表,与创建MySQL sink表语法相同。
重启kafka,创建Topic: case_kafka_mysql
写入json格式的数据
{"ts": "20201011","id": 8,"price_amt":211}
在MySQL中创建名为product_dim的表
- CREATE TABLE `product_dim` (
- `id` bigint(11) NOT NULL,
- `coupon_price_amt` bigint(11) DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向数据表插入如下数据:
- INSERT INTO `product_dim` VALUES (1, 1);
- INSERT INTO `product_dim` VALUES (3, 1);
- INSERT INTO `product_dim` VALUES (8, 1);
在MySQL中创建名为sync_test_3的表
- CREATE TABLE `sync_test_3` (
- `id` bigint(11) NOT NULL AUTO_INCREMENT,
- `ts` varchar(64) DEFAULT NULL,
- `total_gmv` bigint(11) DEFAULT NULL,
- PRIMARY KEY (`id`),
- UNIQUE KEY `uidx` (`ts`) USING BTREE
- ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
- create table flink_test_3 (
- id BIGINT,
- ts VARCHAR,
- price_amt BIGINT,
- proctime AS PROCTIME ()
- )
- with (
- 'connector' = 'kafka',
- 'topic' = 'case_kafka_mysql',
- 'properties.bootstrap.servers' = '127.0.0.1:9092',
- 'properties.group.id' = 'flink_gp_test3',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'json',
- 'json.fail-on-missing-field' = 'false',
- 'json.ignore-parse-errors' = 'true',
- 'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
- );
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- create table flink_test_3_dim (
- id BIGINT,
- coupon_price_amt BIGINT
- )
- WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
- 'table-name' = 'product_dim',
- 'username' = 'root',
- 'password' = 'Admin',
- 'lookup.max-retries' = '3',
- 'lookup.cache.max-rows' = 1000
- );
WITH参数
参数 | 说明 | 类型 | 备注 |
lookup.cache.max-rows | 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 | Integer | 默认情况下,维表Cache是未开启的。 |
lookup.cache.ttl | 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 | Duration | 默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows和 lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 |
lookup.cache.caching-missing-key | 是否缓存空的查询结果。 | Boolean | 参数取值如下: true(默认值):缓存空的查询结果。 false:不缓存空的查询结果。 |
lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 默认值为3。 |
- CREATE TABLE sync_test_3 (
- ts string,
- total_gmv bigint,
- PRIMARY KEY (ts) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
- 'table-name' = 'sync_test_3',
- 'username' = 'root',
- 'password' = 'Admin'
- );
- INSERT INTO sync_test_3
- SELECT
- ts,
- SUM(price_amt - coupon_price_amt) AS total_gmv
- FROM
- (
- SELECT
- a.ts as ts,
- a.price_amt as price_amt,
- b.coupon_price_amt as coupon_price_amt
- FROM
- flink_test_3 as a
- LEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as b
- ON b.id = a.id
- )
- GROUP BY ts;
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
SELECT id, ts, total_gmv FROM sync_test_3;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。