赞
踩
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
“一定要记住,是flink,而不是自己的程序下面”
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
create table t_wx_source(
id INT,
name String,
age String
)
with (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.91',
'port' = '3306',
'database-name' = 'test',
'table-name' = 't_wx_source',
'username' = 'root',
'password' = 'xxxx');
create table t_wx_target(
id INT,
name String,
age String,
primary key(id) not ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.91:3306/test',
'table-name' = 't_wx_target',
'username' = 'root',
'password' = 'xxxx',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
特别注意:
1、主键展示一定要加上
primary key(id) not ENFORCED
1、connector的区别
'connector' = 'mysql-cdc'
'connector' = 'jdbc'
insert into t_wx_target select id,name,age from t_wx_source;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tEnv = StreamTableEnvironment.create(env, settings); List<String> sql = Files.readAllLines(Paths.get(jobRunParam.getSqlPath())); SqlConfig sqlConfig = SqlParser.parseToSqlConfig(sql); //注册自定义的udf setUdf(tEnv, sqlConfig); //设置checkPoint setCheckpoint(env, jobRunParam.getCheckPointParam()); //设置tableConfig TableConfig tableConfig = tEnv.getConfig(); tableConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai")); //加载配置 setConfiguration(tEnv, sqlConfig); //执行ddl callDdl(tEnv, sqlConfig); //执行dml callDml(tEnv, sqlConfig);
/** * 设置Configuration * * @author zhuhuipei * @date 2020-06-23 * @time 00:46 */ private static void setConfiguration(TableEnvironment tEnv, SqlConfig sqlConfig) { if (sqlConfig == null || MapUtils.isEmpty(sqlConfig.getMapConfig())) { return; } Configuration configuration = tEnv.getConfig().getConfiguration(); for (Map.Entry<String, String> entry : sqlConfig.getMapConfig().entrySet()) { log.info("#############setConfiguration#############\n {} {}", entry.getKey(), entry.getValue()); configuration.setString(entry.getKey(), entry.getValue()); } }
private static void callDdl(TableEnvironment tEnv, SqlConfig sqlConfig) {
if (sqlConfig == null || sqlConfig.getDdlList() == null) {
return;
}
for (String ddl : sqlConfig.getDdlList()) {
System.out.println("#############ddl############# \n" + ddl);
log.info("#############ddl############# \n {}", ddl);
tEnv.executeSql(ddl);
}
}
private static void callDml(TableEnvironment tEnv, SqlConfig sqlConfig) {
if (sqlConfig == null || sqlConfig.getDmlList() == null) {
return;
}
for (String dml : sqlConfig.getDmlList()) {
System.out.println("#############dml############# \n" + dml);
log.info("#############dml############# \n {}", dml);
tEnv.executeSql(dml);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。