赞
踩
使用flink版本1.11.1,使用flink cdc功能实时获取MySQL数据变化并计算指标,最后使用insert语句sink到MySQL。
在1.11版本之前,sink表建表是这样的:
CREATE TABLE zktest(
in int,
name string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://xxx/zktest?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8',
'connector.table' = 'zktest',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'xxx',
'connector.password' = 'xxx',
'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次
'connector.write.flush.interval' = '1s',--写入时间间隔
'connector.write.max-retries' = '20' --最大重试次数
);
在flink1.11之后,社区优化了建表语句,更加简洁了
CREATE TABLE zktest(
in int,
name string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx/zktest?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8',
'table-name' = 'zktest',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
);
本来新版连接参数不加后面三个也是默认一条一刷新的,今天测试时在1.11中用了旧版的连接参数,还忘了加后面三个参数,实时数据只有几条时不入库,但是在select时是可以实时更新的,关闭flink任务后数据又入库了。
还有需要注意的一个问题,如果id是用的联合主键,并且是将多个字段concat到一起的话,如果个别字段为空,concat结果也会为null,可以在concat之前对null值做处理,或者设置set table.exec.sink.not-null-enforcer=drop;直接将key为null的值过滤掉
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。