赞
踩
本文主要是通过一个小的案例,记录使用flinksql lookup join的使用,其中mysql作为lookup join的维表
-- 在mysql中创建表source源表
DROP TABLE IF EXISTS `show_log`;
CREATE TABLE `show_log` (
`log_id` int(11) NOT NULL,
`timestamp` datetime DEFAULT NULL,
`user_id` varchar(255) DEFAULT NULL,
PRIMARY KEY (`log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 插入数据
INSERT INTO `show_log` VALUES ('1', '2021-11-01 00:01:03', 'a'), ('2', '2021-11-01 00:03:00', 'b'), ('3', '2021-11-01 00:05:00', 'c'), ('4', '2021-11-01 00:06:00', 'b'), ('5', '2021-11-01 00:07:00', 'c');
-- 在mysql中创建维度表
DROP TABLE IF EXISTS `user_profile`;
CREATE TABLE `user_profile` (
`user_id` varchar(255) NOT NULL,
`age` varchar(255) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 插入数据
INSERT INTO `user_profile` VALUES ('a', '12-18', '男'), ('b', '18-24', '女'), ('c', '18-24', '男');
CREATE TABLE show_log ( log_id BIGINT, `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)), user_id STRING, proctime AS PROCTIME() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'username' = 'xx', 'port' = '3306', 'password' = 'xxx', 'database-name' = 'flink', 'table-name' = 'show_log', 'scan.startup.mode' = 'initial', 'scan.incremental.snapshot.enabled' = 'false' );
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink?characterEncoding=UTF8',
'username' = 'xx',
'password' = 'xx',
'table-name' = 'user_profile',
'lookup.cache.max-rows' = '100',
'lookup.cache.ttl' = '6000s'
);
-- 使用print connector在flink web ui中打印输出
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id;
如果任务在运行过程中taskmanager日志有报GC,可查看taskmanager metrics信息,观察对应指标是否打满,从而重新修改对应配置文件的参数重启集群。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。