赞
踩
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
基于查询的 CDC | 基于 Binlog 的 CDC | |
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.3</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.49</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.0.0</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.75</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
-
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>

- package com.atguigu;
-
- import com.ververica.cdc.connectors.mysql.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkCDC {
-
- public static void main(String[] args) throws Exception {
-
- //1.获取Flink 执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- //1.1 开启CK
- // env.enableCheckpointing(5000);
- // env.getCheckpointConfig().setCheckpointTimeout(10000);
- // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- //
- // env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
-
- //2.通过FlinkCDC构建SourceFunction
- DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
- .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("000000")
- .databaseList("cdc_test")
- //不配置表的话,就会监听库下所有的表的数据
- // .tableList("cdc_test.user_info")
- .deserializer(new StringDebeziumDeserializationSchema())
- .startupOptions(StartupOptions.initial())
- .build();
- DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
-
- //3.数据打印
- dataStreamSource.print();
-
- //4.启动任务
- env.execute("FlinkCDC");
-
- }
-
- }

StartupOptions:
initial:
第一次启动时 读取原表已有的历史数据, 操作类型为READ, 之后不断做检查点存储
第二次启动时 一定要指明检查点文件的具体位置, 这样就可以断点续传; 即使Flink宕机了, 重启后是从上次offset开始读, 而不是latest
检查点在打包部署后才有用, 因为那样才可以指明检查点的具体位置
earliest:
从BinLog第一行数据开始读, 最好先给这个数据库加上BinLog后, 再去读取创建数据库
latest:
读取最新变更数据, 从Flink程序启动后开始算
timestamp:
可以从BinLog某一时刻的数据开始读
specificOffset:
指明BinLog文件位置和从哪个offset开始读;
这个一般来说不怎么用, 因为本地没存offset的信息, 很难知道offset读到哪了
2)开启 MySQL Binlog 并重启 MySQL
3)启动 Flink 集群
[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh
4)启动 HDFS 集群
[atguigu@hadoop102 flink-standalone]$ start-dfs.sh
5)启动程序
[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据
7)给当前的 Flink 程序创建 Savepoint
[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save
8)关闭程序以后从 Savepoint 重启程序
[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
- package com.atguigu;
-
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
-
- public class FlinkSQLCDC {
-
- public static void main(String[] args) throws Exception {
-
- //1.获取执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- //2.使用FLINKSQL DDL模式构建CDC 表
- tableEnv.executeSql("CREATE TABLE user_info ( " +
- " id STRING primary key, " +
- " name STRING, " +
- " sex STRING " +
- ") WITH ( " +
- " 'connector' = 'mysql-cdc', " +
- " 'scan.startup.mode' = 'latest-offset', " +
- " 'hostname' = 'hadoop102', " +
- " 'port' = '3306', " +
- " 'username' = 'root', " +
- " 'password' = '000000', " +
- " 'database-name' = 'cdc_test', " +
- " 'table-name' = 'user_info' " +
- ")");
-
- //3.查询数据并转换为流输出
- Table table = tableEnv.sqlQuery("select * from user_info");
- DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
- retractStream.print();
-
- //4.启动
- env.execute("FlinkSQLCDC");
-
- }
-
- }

2.3.1 代码实现
- package com.atguigu.func;
-
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- import java.util.List;
-
- public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {
-
-
- /**
- * {
- * "db":"",
- * "tableName":"",
- * "before":{"id":"1001","name":""...},
- * "after":{"id":"1001","name":""...},
- * "op":""
- * }
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
-
- //创建JSON对象用于封装结果数据
- JSONObject result = new JSONObject();
-
- //获取库名&表名
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- result.put("db", fields[1]);
- result.put("tableName", fields[2]);
-
- //获取before数据
- Struct value = (Struct) sourceRecord.value();
- Struct before = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (before != null) {
- //获取列信息
- Schema schema = before.schema();
- List<Field> fieldList = schema.fields();
-
- for (Field field : fieldList) {
- beforeJson.put(field.name(), before.get(field));
- }
- }
- result.put("before", beforeJson);
-
- //获取after数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after != null) {
- //获取列信息
- Schema schema = after.schema();
- List<Field> fieldList = schema.fields();
-
- for (Field field : fieldList) {
- afterJson.put(field.name(), after.get(field));
- }
- }
- result.put("after", afterJson);
-
- //获取操作类型
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- result.put("op", operation);
-
- //输出数据
- collector.collect(result.toJSONString());
-
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }

本章图片来源于北京站 Flink Meetup 分享的《详解 Flink-CDC》
痛点:
在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:
1.Chunk 切分;2.Chunk 分配;(实现并行读取数据&CheckPoint)
3.Chunk 读取;(实现无锁读取)
4.Chunk 汇报;
5.Chunk 分配。
根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。
将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标。
同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。
若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。
读取可以分为 5 个阶段
1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;
2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;
3)查询完成之后记录当前的 Binlog 位置信息记为高位点;
4)在增量部分消费从低位点到高位点的 Binlog;
5)根据主键,对 buffer 中的数据进行修正并输出。
通过以上 5 个阶段可以保证每个 Chunk 最终的输出就是在高位点时该 Chunk 中最新的数据,
但是目前只是做到了保证单个 Chunk 中的数据一致性。
在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。
FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 Snapshot Chunk 完成信息之后,还有一个消费增量数据(Binlog)的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。
- MySqlHybridSplitAssigner
-
- private MySqlBinlogSplit createBinlogSplit(){
-
- final List<MySqlSnapshotSplit> assignedSnapshotSplit=snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
- Map<String, BinlogOffset> splitFinishedOffsets=snapshotSplitAssigner.getSplitFinishedOffsets();
- final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos=new ArrayList<>();
-
- final Map<TableId, TableChanges.TableChange>tableSchemas=new HashMap<>();
- BinlogOffset minBinlogOffset=BinlogOffset.INITIAL_OFFSET;for(MySqlSnapshotSplit split:assignedSnapshotSplit){
- // find the min binlog offset
-
- BinlogOffset binlogOffset=splitFinishedOffsets.get(split.splitId());if(binlogOffset.compareTo(minBinlogOffset)< 0){
- minBinlogOffset=binlogOffset;
- }
- finishedSnapshotSplitInfos.add(
- new FinishedSnapshotSplitInfo(
- split.getTableId(),
- split.splitId(),
- split.getSplitStart(),
- split.getSplitEnd(),
- binlogOffset));
- tableSchemas.putAll(split.getTableSchemas());
- }
- final MySqlSnapshotSplit lastSnapshotSplit=assignedSnapshotSplit.get(assignedSnapshotSplit.size()-1).asSnapshotSplit();
- return new MySqlBinlogSplit(
- BINLOG_SPLIT_ID,
- lastSnapshotSplit.getSplitKeyType(),
- minBinlogOffset,
- BinlogOffset.NO_STOPPING_OFFSET,
- finishedSnapshotSplitInfos,
- tableSchemas);
- }

3.4.2 读取低位点到高位点之间的 Binlog
- BinlogSplitReader
- /**
- *Returns the record should emit or not.
- *
- *<p>The watermark signal algorithm is the binlog split reader only sends the binlog event
- that
- *belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
- *since the offset is after its high watermark.
- *
- * <pre> E.g: the data input is :
- * snapshot-split-0 info : [0, 1024) highWatermark0
- *snapshot-split-1 info : [1024, 2048) highWatermark1
- *the data output is:
- *only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
- *only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should
- send.
- *</pre>
- */
- private boolean shouldEmit(SourceRecord sourceRecord) {
- if (isDataChangeRecord(sourceRecord)) {
- TableId tableId = getTableId(sourceRecord);
- BinlogOffset position = getBinlogPosition(sourceRecord);
- //aligned, all snapshot splits of the table has reached max highWatermark if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
- return true;
- }
- Object[] key = getSplitKey(
- currentBinlogSplit.getSplitKeyType(),
- sourceRecord,
- statefulTaskContext.getSchemaNameAdjuster()); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
- if (RecordUtils.splitKeyRangeContains(
- key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
- &&position.isAtOrBefore(splitInfo.getHighWatermark())) { return true;
- }
- }
- //not in the monitored splits scope, do not emit return false;
- }
- //always send the schema change event and signal event
- //we need record them to state of Flink
- return true;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。