当前位置:   article > 正文

记录一下在使用flink sql设置sink表连接参数时遇到的一个问题_autoreconnect=true 无效 flinksql

autoreconnect=true 无效 flinksql

使用flink版本1.11.1,使用flink cdc功能实时获取MySQL数据变化并计算指标,最后使用insert语句sink到MySQL。
在1.11版本之前,sink表建表是这样的:

CREATE TABLE zktest(
in int,
name string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://xxx/zktest?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8',
 'connector.table' = 'zktest',
 'connector.driver' = 'com.mysql.jdbc.Driver',
 'connector.username' = 'xxx',
 'connector.password' = 'xxx',
 'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次
 'connector.write.flush.interval' = '1s',--写入时间间隔
 'connector.write.max-retries' = '20' --最大重试次数
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在flink1.11之后,社区优化了建表语句,更加简洁了

CREATE TABLE zktest(
in int,
name string,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx/zktest?useSSL=false&autoReconnect=true&useUnicode=true&characterEncoding=utf-8',
'table-name' = 'zktest',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

本来新版连接参数不加后面三个也是默认一条一刷新的,今天测试时在1.11中用了旧版的连接参数,还忘了加后面三个参数,实时数据只有几条时不入库,但是在select时是可以实时更新的,关闭flink任务后数据又入库了。

还有需要注意的一个问题,如果id是用的联合主键,并且是将多个字段concat到一起的话,如果个别字段为空,concat结果也会为null,可以在concat之前对null值做处理,或者设置set table.exec.sink.not-null-enforcer=drop;直接将key为null的值过滤掉

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/615127
推荐阅读
相关标签
  

闽ICP备14008679号