当前位置:   article > 正文

flink-mysql-cdc_flink-connector-mysql-cdc

flink-connector-mysql-cdc

MAVEN

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可
一定要记住,是flink,而不是自己的程序下面

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

运行json

mysql-cdc 表1(作为source):

create table t_wx_source(
id INT, 
name String, 
age String
) 
with ( 
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.91',
'port' = '3306', 
'database-name' = 'test',
'table-name' = 't_wx_source',
'username' = 'root', 
'password' = 'xxxx');

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

mysql jdbc表(作为sink表):

create table t_wx_target( 
id INT, 
name String, 
age String, 
primary key(id) not ENFORCED
) with ( 
'connector' = 'jdbc', 
'url' = 'jdbc:mysql://192.168.1.91:3306/test', 
'table-name' = 't_wx_target', 
'username' = 'root', 
'password' = 'xxxx', 
'driver' = 'com.mysql.cj.jdbc.Driver'
);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

特别注意:
1、主键展示一定要加上

primary key(id) not ENFORCED
  • 1

1、connector的区别

'connector' = 'mysql-cdc'
  • 1
'connector' = 'jdbc'
  • 1

操作语句insert(update/delete)

insert into t_wx_target select id,name,age from t_wx_source;
  • 1

java代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();

            TableEnvironment tEnv = StreamTableEnvironment.create(env, settings);


            List<String> sql = Files.readAllLines(Paths.get(jobRunParam.getSqlPath()));

            SqlConfig sqlConfig = SqlParser.parseToSqlConfig(sql);


            //注册自定义的udf
            setUdf(tEnv, sqlConfig);

            //设置checkPoint
            setCheckpoint(env, jobRunParam.getCheckPointParam());

            //设置tableConfig
            TableConfig tableConfig = tEnv.getConfig();
            tableConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"));


            //加载配置
            setConfiguration(tEnv, sqlConfig);


            //执行ddl
            callDdl(tEnv, sqlConfig);

            //执行dml
            callDml(tEnv, sqlConfig);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
/**
 * 设置Configuration
 *
 * @author zhuhuipei
 * @date 2020-06-23
 * @time 00:46
 */
private static void setConfiguration(TableEnvironment tEnv, SqlConfig sqlConfig) {
    if (sqlConfig == null || MapUtils.isEmpty(sqlConfig.getMapConfig())) {
        return;
    }
    Configuration configuration = tEnv.getConfig().getConfiguration();
    for (Map.Entry<String, String> entry : sqlConfig.getMapConfig().entrySet()) {
        log.info("#############setConfiguration#############\n  {} {}", entry.getKey(), entry.getValue());
        configuration.setString(entry.getKey(), entry.getValue());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
private static void callDdl(TableEnvironment tEnv, SqlConfig sqlConfig) {
    if (sqlConfig == null || sqlConfig.getDdlList() == null) {
        return;
    }

    for (String ddl : sqlConfig.getDdlList()) {
        System.out.println("#############ddl############# \n" + ddl);
        log.info("#############ddl############# \n {}", ddl);
        tEnv.executeSql(ddl);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
private static void callDml(TableEnvironment tEnv, SqlConfig sqlConfig) {
    if (sqlConfig == null || sqlConfig.getDmlList() == null) {
        return;
    }
    for (String dml : sqlConfig.getDmlList()) {
        System.out.println("#############dml############# \n" + dml);
        log.info("#############dml############# \n {}", dml);
        tEnv.executeSql(dml);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/594782
推荐阅读
相关标签
  

闽ICP备14008679号