当前位置:   article > 正文

纯干货基于flinkcdc实现mysql到mysql/oracle/...... DML实时同步_flink cdc mysql sink

flink cdc mysql sink

CDC

首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
  • 1

Flink SQL CDC 数据同步与原理解析

CDC 全称是 Change Data Capture ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。
  • 1

在这里插入图片描述flinkCDC文档
flinkCDC:https://ververica.github.io/flink-cdc-connectors/release-2.0/
flink文档
flink1.13:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/

废话不多说,开始实战
一:基于自定义source和sink的方式
1.业务表与数据源示例
源库schema:amir
源表:在这里插入图片描述目标schema:hmm
目标表:在这里插入图片描述
2.依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.amir.flink</groupId>
	<artifactId>flinkcdc20</artifactId>
	<version>1.0.0</version>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>8</source>
					<target>8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<packaging>jar</packaging>
	<description>this is test</description>
	<properties>
		<java.version>1.8</java.version>
		<fastjson.version>1.2.75</fastjson.version>
		<druid.version>1.2.5</druid.version>
		<flink.version>1.13.1</flink.version>
		<scala.binary.version>2.12</scala.binary.version>
		<HikariCP.version>3.2.0</HikariCP.version>
		<Impala.version>2.6.4</Impala.version>
		<kafka.version>2.8.0</kafka.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>com.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>2.11-1.9.0</version>
		</dependency>
		<dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<version>2.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-runtime_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>com.zaxxer</groupId>
			<artifactId>HikariCP</artifactId>
			<version>${HikariCP.version}</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.13</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.13</artifactId>
			<version>${kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.25</version>
		</dependency>
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.8.2</version>
		</dependency>
	</dependencies>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167

3.Source 和 Sink,此处sink以mysql示例

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
      .hostname("192.168.16.162")
      .port(3306)
      .databaseList("amir") // monitor all tables under inventory database
      .username("root")
      .password("123456")
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .addSink(new MysqlSink());

    env.execute("mysqlAmirToMysqlHmm");
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

4.自定义序列化类JsonDebeziumDeserializationSchema,序列化Debezium输出的数据

public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        HashMap<String, Object> hashMap = new HashMap<>();

        String topic = sourceRecord.topic();
        String[] split = topic.split("[.]");
        String database = split[1];
        String table = split[2];
        hashMap.put("database",database);
        hashMap.put("table",table);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取数据本身
        Struct struct = (Struct)sourceRecord.value();
        Struct after = struct.getStruct("after");
        Struct before = struct.getStruct("before");
        /*
         	 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
             2,只存在 beforeStruct 就是delete数据
             3,只存在 afterStruct数据 就是insert数据
        */
        if (after != null) {
            //insert
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), after.get(field.name()));
            }
            hashMap.put("data",hm);
        }else if (before !=null){
            //delete
            Schema schema = before.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), before.get(field.name()));
            }
            hashMap.put("data",hm);
        }else if(before !=null && after !=null){
            //update
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), after.get(field.name()));
            }
            hashMap.put("data",hm);
        }

        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }else if("delete".equals(type)) {
            type = "delete";
        }else if("update".equals(type)) {
            type = "update";
        }

        hashMap.put("type",type);

        Gson gson = new Gson();
        collector.collect(gson.toJson(hashMap));
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

5.创建Sink,将数据变化存入mysql中,以insert、delete、update分别为例,如需要写入oracle、hdfs、hive、Clickhouse等,修改对应数据源连接信息

public class MysqlSink extends RichSinkFunction<String> {
    Connection connection;
    PreparedStatement iStmt,dStmt,uStmt;
    private Connection getConnection() {
        Connection conn = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            String url = "jdbc:mysql://192.168.16.162:3306/hmm?useSSL=false";
            conn = DriverManager.getConnection(url,"root","123456");

        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String insertSql = "insert into amirtwo(ID,CRON) values (?,?)";
        String deleteSql = "delete from amirtwo where ID=?";
        String updateSql = "update amirtwo set CRON=? where ID=?";
        iStmt = connection.prepareStatement(insertSql);
        dStmt = connection.prepareStatement(deleteSql);
        uStmt = connection.prepareStatement(updateSql);
    }

    // 每条记录插入时调用一次
    public void invoke(String value, Context context) throws Exception {
        //{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}        //{"CRON":"7","canal_type":"insert","ID":"6","canal_ts":0,"canal_database":"amirone","pk_hashcode":0}
        Gson t = new Gson();
        HashMap<String, Object> hs = t.fromJson(value, HashMap.class);
        String database = (String) hs.get("database");
        String table = (String) hs.get("table");
        String type = (String) hs.get("type");

        if ("amir".equals(database) && "amirone".equals(table)) {
            if ("insert".equals(type)) {
                System.out.println("insert => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                String cron = (String) data.get("CRON");
                iStmt.setString(1, id);
                iStmt.setString(2, cron);
                iStmt.executeUpdate();
            }else if ("delete".equals(type)) {
                System.out.println("delete => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                dStmt.setString(1, id);
                dStmt.executeUpdate();
            }else if ("update".equals(type)) {
                System.out.println("update => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                String cron = (String) data.get("CRON");
                uStmt.setString(1, cron);
                uStmt.setString(2, id);
                uStmt.executeUpdate();
            }
        }
    }
    @Override
    public void close() throws Exception {
        super.close();

        if(iStmt != null) {
            iStmt.close();
        }
        if(dStmt != null) {
            dStmt.close();
        }
        if(uStmt != null) {
            uStmt.close();
        }

        if(connection != null) {
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

6.运行MySqlBinlogSourceExample,查看source和sink
source:
在这里插入图片描述sink:插入3行,删除1行,更新4行,数据实时从A库业务表更新至B库业务表
在这里插入图片描述二:基于Flink SQL CDC,面向sql,简单易上手

public class MysqlToMysqlMain {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(3000);
        // 高级选项:
        // 设置模式为exactly-once (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(1);

        EnvironmentSettings Settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " ID STRING,\n" +
                        " CRON STRING,\n" +
                        " primary key (ID) not enforced\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.16.162',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '123456',\n" +
                        " 'database-name' = 'amir',\n" +
                        " 'table-name' = 'amirone',\n" +
                        " 'scan.startup.mode' = 'latest-offset'\n" +
                        ")";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " ID STRING,\n" +
                        " CRON STRING,\n" +
                        " primary key (ID) not enforced\n" +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                        " 'url' = 'jdbc:mysql://192.168.16.162:3306/hmm?serverTimezone=UTC&useSSL=false',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '123456',\n" +
                        " 'table-name' = 'amirtwo'\n" +
                        ")";
        // 简单的聚合处理
        String transformDmlSQL =  "insert into test_cdc_sink select * from mysql_binlog";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        tableEnv.executeSql(transformDmlSQL);

        env.execute("sync-flink-cdc");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

最终代码结构
在这里插入图片描述ending
逐梦,time will tell,yep!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/395965
推荐阅读
相关标签
  

闽ICP备14008679号