赞
踩
点击上方蓝色字体,选择“设为星标”
回复"面试"获取更多惊喜
Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。
CDC 的全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向 数据库的变更,是一种用于捕获数据库中数据变更的技术。
数据同步,用于备份,容灾
数据分发,一个数据源分发给多个下游
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成
目前业界主流的实现机制的可以分为两种:
基于查询的 CDC
- a.离线调度查询作业,批处理。
- b.无法保障数据一致性。
- c.不保障实时性。
基于日志的 CDC
- a.实时消费日志,流处理。
- b.保障数据一致性。
- c.提供实时数据。
https://github.com/ververica/flink-cdc-connectors
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>Flink-learning</artifactId>
- <groupId>com.wudl.flink</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>Flink-cdc2.0</artifactId>
- <properties>
- <flink-version>1.13.0</flink-version>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- </properties>
-
-
-
- <dependencies>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.2</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>${flink-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>${flink-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.3</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.49</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>${flink-version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.0.2</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.75</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_2.12</artifactId>
- <version>1.13.3</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>

- package com.wud.cdc2;
-
- import com.ververica.cdc.connectors.mysql.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- import static org.apache.flink.table.api.Expressions.$;
-
- public class FlinkCDC {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- env.enableCheckpointing(5000L);
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // 设置任务关闭时候保留最后一次checkpoint 的数据
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // 指定ck 的自动重启策略
- env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
- // 设置hdfs 的访问用户名
- System.setProperty("HADOOP_USER_NAME","hdfs");
-
- DebeziumSourceFunction<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("192.168.1.180")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("test")
- .tableList("test.Flink_iceberg")
- .deserializer(new StringDebeziumDeserializationSchema())
- .startupOptions(StartupOptions.initial())
- .build();
- DataStreamSource<String> dataStreamSource = env.addSource(mySqlSource);
- dataStreamSource.print();
- env.execute();
-
-
- }
- }

- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585007,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585013}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585015,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585016}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585017}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585017,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10011,name=flink-mysql,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10012,name=flink-mysqA,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10013,name=flink-mysqA3,age=19,dt=2021-09-24},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10014,name=flink-mysqA4,age=19,dt=2021-09-28},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=true,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1633178585, file=mysql-bin.000036, pos=765}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=10050,name=flink-cdc-add,age=21,dt=2021-10-2},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1633178585018,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000036,pos=765,row=0},op=r,ts_ms=1633178585018}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

- [root@basenode flink-1.13.2]# bin/flink run -c com.wud.cdc2.FlinkCDC /opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar
-
- Job has been submitted with JobID 137b680a6bb934e43568f14f6583b62c
给当前程序创建保存点-savepoint
- [root@basenode flink-1.13.2]# bin/flink savepoint e8e918c2517a777e817c630cf1d6b932 hdfs://192.168.1.161:8020/cdc-test/savepoint
- Triggering savepoint for job e8e918c2517a777e817c630cf1d6b932.
- Waiting for response...
- Savepoint completed. Path: hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be
- You can resume your program from this savepoint with the run command.
- [root@basenode flink-1.13.2]#
然后再mysql中添加数据
- [root@basenode flink-1.13.2]# bin/flink run -s hdfs://192.168.1.161:8020/cdc-test/savepoint/savepoint-e8e918-9ef094f349be -c com.wud.cdc2.FlinkCDC /opt/datas/Flink-cdc2.0-1.0-SNAPSHOT-jar-with-dependencies.jar
- Job has been submitted with JobID 474a0da99820aa6025203f9806b9fcad
查看日志:
从上面可以看出flink cdc 的原始结构
- SourceRecord{sourcePartition={server=mysql_binlog_source},
- sourceOffset={file=mysql-bin.000063, pos=154}}
- ConnectRecord{topic='mysql_binlog_source.wudl-gmall.user_info', kafkaPartition=null, key=Struct{id=4000}, keySchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Key:STRUCT}, value=Struct{after=Struct{id=4000,login_name=i0v0k9,nick_name=xxx,name=xxx,phone_num=137xxxxx,email=xxxx@qq.com,user_level=1,birthday=1969-12-04,gender=F,create_time=2020-12-04 23:28:45},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wudl-gmall,table=user_info,server_id=0,file=mysql-bin.000063,pos=154,row=0},op=c,ts_ms=1636255826014}, valueSchema=Schema{mysql_binlog_source.wudl_gmall.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
我们可以自定义序列化:
- package com.wud.cdc2;
-
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
- import java.util.List;
-
- public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
- /**
- *
- * SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1636269821, file=mysql-bin.000063, pos=6442}} ConnectRecord{topic='mysql_binlog_source.test.Flink_iceberg', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=102,name=flinksql,age=25,dt=2021-11-08},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1636269821531,snapshot=last,db=test,table=Flink_iceberg,server_id=0,file=mysql-bin.000063,pos=6442,row=0},op=r,ts_ms=1636269821531}, valueSchema=Schema{mysql_binlog_source.test.Flink_iceberg.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- *
- *
- *
- *
- *
- * @param sourceRecord 返回一行数据
- * @param collector 数据输出
- * @throws Exception
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
-
- JSONObject result = new JSONObject();
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- result.put("db",fields[1]) ;
- result.put("tableName",fields[2]);
- // 获取before 数据
- Struct value = (Struct) sourceRecord.value();
- Struct before = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (before !=null)
- {
- //获取列信息
- Schema schema = before.schema();
- List<Field> fieldList = schema.fields();
- for (Field field:fieldList)
- {
- beforeJson.put(field.name(),before.get(field));
- }
- }
- result.put("before",beforeJson);
- // 获取after 数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after !=null)
- {
- Schema schema = after.schema();
- List<Field> afterFields = schema.fields();
- for (Field field:afterFields)
- {
- afterJson.put(field.name(),after.get(field));
- }
- }
- result.put("after", afterJson);
- //获取操作类型
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- result.put("op", operation);
- //输出数据
- collector.collect(result.toJSONString());
- }
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }

调用flink cdc 的自定义函数
- package com.wud.cdc2;
-
- import com.ververica.cdc.connectors.mysql.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.CheckpointConfig;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- public class FlinkCDC {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // env.enableCheckpointing(5000L);
- // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // // 设置任务关闭时候保留最后一次checkpoint 的数据
- // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- // // 指定ck 的自动重启策略
- // env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/cdc2.0-test/ck"));
- // // 设置hdfs 的访问用户名
- // System.setProperty("HADOOP_USER_NAME","hdfs");
-
- DebeziumSourceFunction<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("192.168.1.180")
- .port(3306)
- .username("root")
- .password("123456")
- .databaseList("test")
- .tableList("test.Flink_iceberg")
- // .deserializer(new StringDebeziumDeserializationSchema())
- .deserializer(new CustomerDeserialization())
- .startupOptions(StartupOptions.initial())
- .build();
- DataStreamSource<String> dataStreamSource = env.addSource(mySqlSource);
- dataStreamSource.print();
- env.execute();
- }
- }

新增一条数据可以看出 控制台输出结果:
{"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
需要注意的是必须要有主键 否则更新数据是新增一列,加主键后,更新数据不会新增。
数据库表结构:
- CREATE TABLE `Flink_iceberg` (
- `id` bigint(64) DEFAULT NULL,
- `name` varchar(64) DEFAULT NULL,
- `age` int(20) DEFAULT NULL,
- `dt` varchar(64) DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=latin1
实现代码:
- package com.wud.cdc2;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkCdc20MysqlToMysql {
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- String sourceSql = "CREATE TABLE IF NOT EXISTS mySqlSource (" +
- "id BIGINT primary key, " +
- "name string ," +
- "age int," +
- "dt string" +
- ") with ( " +
- " 'connector' = 'mysql-cdc', " +
- " 'scan.startup.mode' = 'latest-offset', " +
- " 'hostname' = '192.168.1.180', " +
- " 'port' = '3306', " +
- " 'username' = 'root', " +
- " 'password' = '123456', " +
- " 'database-name' = 'test', " +
- " 'table-name' = 'Flink_iceberg' " +
- ")";
-
- String sinkSql = " CREATE TABLE IF NOT EXISTS mySqlSink (" +
- "id BIGINT primary key , " +
- "name string ," +
- "age int," +
- "dt string" +
- ") with (" +
- " 'connector' = 'jdbc'," +
- " 'url' = 'jdbc:mysql://192.168.1.180:3306/test'," +
- "'table-name' = 'Flink_iceberg-cdc'," +
- " 'username' = 'root'," +
- " 'password' = '123456' " +
- " )";
- tableEnv.executeSql(sourceSql);
- tableEnv.executeSql(sinkSql);
- tableEnv.executeSql("insert into mySqlSink select * from mySqlSource ");
- // env.execute("FlinkCdc20MysqlToMysql");
- }
- }

- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA","id":10012,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysqA3","id":10013,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-28","name":"flink-mysqA4","id":10014,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-09-24","name":"flink-mysql","id":10011,"age":19},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"flink","id":101,"age":20},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-11-08","name":"flinksql","id":102,"age":25},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-11-09","name":"flink-table","id":103,"age":21},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
- {"op":"READ","before":{},"after":{"dt":"2021-11-07","name":"hbase","id":105,"age":25},"db":"test","tableName":"Flink_iceberg"}
- {"op":"UPDATE","before":{"dt":"2021-11-07","name":"spark","id":104,"age":22},"after":{"dt":"2021-11-07","name":"spark02","id":104,"age":22},"db":"test","tableName":"Flink_iceberg"}
- {"op":"CREATE","before":{},"after":{"dt":"2021-11-07","name":"flinkcdc","id":106,"age":22},"db":"test","tableName":"Flink_iceberg"}
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!
2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。