当前位置:   article > 正文

大数据技术之 Flink-CDC_flinkcdc

flinkcdc

1 CDC简介

1.1 什么是 CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC 的种类 

CDC 主要分为基于查询基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的 CDC

基于 Binlog 的 CDC

开源产品

Sqoop、Kafka JDBC Source

Canal、Maxwell、Debezium

执行模式

Batch

Streaming

是否可以捕获所有数据变化

延迟性

高延迟

低延迟

是否增加数据库压力

1.3 Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

 2  Flink CDC 案例实操

2.1 DataStream 方式的应用

2.1.1 导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.12.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.12</artifactId>
  10. <version>1.12.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.12</artifactId>
  15. <version>1.12.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.hadoop</groupId>
  19. <artifactId>hadoop-client</artifactId>
  20. <version>3.1.3</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>mysql</groupId>
  24. <artifactId>mysql-connector-java</artifactId>
  25. <version>5.1.49</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-table-planner-blink_2.12</artifactId>
  30. <version>1.12.0</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>com.ververica</groupId>
  34. <artifactId>flink-connector-mysql-cdc</artifactId>
  35. <version>2.0.0</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.alibaba</groupId>
  39. <artifactId>fastjson</artifactId>
  40. <version>1.2.75</version>
  41. </dependency>
  42. </dependencies>
  43. <build>
  44. <plugins>
  45. <plugin>
  46. <groupId>org.apache.maven.plugins</groupId>
  47. <artifactId>maven-assembly-plugin</artifactId>
  48. <version>3.0.0</version>
  49. <configuration>
  50. <descriptorRefs>
  51. <descriptorRef>jar-with-dependencies</descriptorRef>
  52. </descriptorRefs>
  53. </configuration>
  54. <executions>
  55. <execution>
  56. <id>make-assembly</id>
  57. <phase>package</phase>
  58. <goals>
  59. <goal>single</goal>
  60. </goals>
  61. </execution>
  62. </executions>
  63. </plugin>
  64. </plugins>
  65. </build>

2.1.2 编写代码

  1. package com.atguigu;
  2. import com.ververica.cdc.connectors.mysql.MySqlSource;
  3. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class FlinkCDC {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取Flink 执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. //1.1 开启CK
  16. // env.enableCheckpointing(5000);
  17. // env.getCheckpointConfig().setCheckpointTimeout(10000);
  18. // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  19. // env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  20. //
  21. // env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
  22. //2.通过FlinkCDC构建SourceFunction
  23. DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  24. .hostname("hadoop102")
  25. .port(3306)
  26. .username("root")
  27. .password("000000")
  28. .databaseList("cdc_test")
  29. //不配置表的话,就会监听库下所有的表的数据
  30. // .tableList("cdc_test.user_info")
  31. .deserializer(new StringDebeziumDeserializationSchema())
  32. .startupOptions(StartupOptions.initial())
  33. .build();
  34. DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
  35. //3.数据打印
  36. dataStreamSource.print();
  37. //4.启动任务
  38. env.execute("FlinkCDC");
  39. }
  40. }

StartupOptions:
initial:
        第一次启动时 读取原表已有的历史数据, 操作类型为READ, 之后不断做检查点存储
        第二次启动时 一定要指明检查点文件的具体位置, 这样就可以断点续传; 即使Flink宕机了, 重启后是从上次offset开始读, 而不是latest
        检查点在打包部署后才有用, 因为那样才可以指明检查点的具体位置

earliest:
        从BinLog第一行数据开始读, 最好先给这个数据库加上BinLog后, 再去读取创建数据库

latest:
        读取最新变更数据, 从Flink程序启动后开始算

timestamp:
        可以从BinLog某一时刻的数据开始读

specificOffset:
        指明BinLog文件位置和从哪个offset开始读;

这个一般来说不怎么用, 因为本地没存offset的信息, 很难知道offset读到哪了

2.1.3 案例测试

2)开启 MySQL Binlog 并重启 MySQL

3)启动 Flink 集群

[atguigu@hadoop102 flink-standalone]$ bin/start-cluster.sh

4)启动 HDFS 集群

[atguigu@hadoop102 flink-standalone]$ start-dfs.sh

5)启动程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在 MySQL 的 gmall-flink.z_user_info 表中添加、修改或者删除数据

7)给当前的 Flink 程序创建 Savepoint

[atguigu@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

8)关闭程序以后从 Savepoint 重启程序

[atguigu@hadoop102 flink-standalone]$ bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar

2.2 FlinkSQL 方式的应用

2.2.1 代码实现

  1. package com.atguigu;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.types.Row;
  8. public class FlinkSQLCDC {
  9. public static void main(String[] args) throws Exception {
  10. //1.获取执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14. //2.使用FLINKSQL DDL模式构建CDC 表
  15. tableEnv.executeSql("CREATE TABLE user_info ( " +
  16. " id STRING primary key, " +
  17. " name STRING, " +
  18. " sex STRING " +
  19. ") WITH ( " +
  20. " 'connector' = 'mysql-cdc', " +
  21. " 'scan.startup.mode' = 'latest-offset', " +
  22. " 'hostname' = 'hadoop102', " +
  23. " 'port' = '3306', " +
  24. " 'username' = 'root', " +
  25. " 'password' = '000000', " +
  26. " 'database-name' = 'cdc_test', " +
  27. " 'table-name' = 'user_info' " +
  28. ")");
  29. //3.查询数据并转换为流输出
  30. Table table = tableEnv.sqlQuery("select * from user_info");
  31. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
  32. retractStream.print();
  33. //4.启动
  34. env.execute("FlinkSQLCDC");
  35. }
  36. }

2.3 自定义反序列化器

2.3.1 代码实现

  1. package com.atguigu.func;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {
  14. /**
  15. * {
  16. * "db":"",
  17. * "tableName":"",
  18. * "before":{"id":"1001","name":""...},
  19. * "after":{"id":"1001","name":""...},
  20. * "op":""
  21. * }
  22. */
  23. @Override
  24. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  25. //创建JSON对象用于封装结果数据
  26. JSONObject result = new JSONObject();
  27. //获取库名&表名
  28. String topic = sourceRecord.topic();
  29. String[] fields = topic.split("\\.");
  30. result.put("db", fields[1]);
  31. result.put("tableName", fields[2]);
  32. //获取before数据
  33. Struct value = (Struct) sourceRecord.value();
  34. Struct before = value.getStruct("before");
  35. JSONObject beforeJson = new JSONObject();
  36. if (before != null) {
  37. //获取列信息
  38. Schema schema = before.schema();
  39. List<Field> fieldList = schema.fields();
  40. for (Field field : fieldList) {
  41. beforeJson.put(field.name(), before.get(field));
  42. }
  43. }
  44. result.put("before", beforeJson);
  45. //获取after数据
  46. Struct after = value.getStruct("after");
  47. JSONObject afterJson = new JSONObject();
  48. if (after != null) {
  49. //获取列信息
  50. Schema schema = after.schema();
  51. List<Field> fieldList = schema.fields();
  52. for (Field field : fieldList) {
  53. afterJson.put(field.name(), after.get(field));
  54. }
  55. }
  56. result.put("after", afterJson);
  57. //获取操作类型
  58. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  59. result.put("op", operation);
  60. //输出数据
  61. collector.collect(result.toJSONString());
  62. }
  63. @Override
  64. public TypeInformation<String> getProducedType() {
  65. return BasicTypeInfo.STRING_TYPE_INFO;
  66. }
  67. }

 3  Flink-CDC 2.0

本章图片来源于北京站 Flink Meetup 分享的《详解 Flink-CDC》

痛点:

3.2 设计目标

3.3 设计实现

3.3.1 整体概览

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

1.Chunk 切分;2.Chunk 分配;(实现并行读取数据&CheckPoint

3.Chunk 读取;(实现无锁读取)

4.Chunk 汇报;

5.Chunk 分配。

3.3.2 Chunk 切分

根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

3.3.3 Chunk 分配

将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标

同时在每个 Chunk 读取的时候可以单独 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。

若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。

3.3.4 Chunk 读取

读取可以分为 5 个阶段

1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;

2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;

3)查询完成之后记录当前的 Binlog 位置信息记为高位点;

4)在增量部分消费从低位点到高位点的 Binlog

5)根据主键,对 buffer 中的数据进行修正并输出。

通过以上 5 个阶段可以保证每个 Chunk 最终的输出就是在高位点时该 Chunk 中最新的数据,

但是目前只是做到了保证单个 Chunk 中的数据一致性。

3.3.5 Chunk 汇报

在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

3.3.6 Chunk 分配

FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 Snapshot Chunk 完成信息之后,还有一个消费增量数据(Binlog的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。

3.4 核心原理分析

3.4.1 Binlog Chunk 中开始读取位置源码

  1. MySqlHybridSplitAssigner
  2. private MySqlBinlogSplit createBinlogSplit(){
  3. final List<MySqlSnapshotSplit> assignedSnapshotSplit=snapshotSplitAssigner.getAssignedSplits().values().stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
  4. Map<String, BinlogOffset> splitFinishedOffsets=snapshotSplitAssigner.getSplitFinishedOffsets();
  5. final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos=new ArrayList<>();
  6. final Map<TableId, TableChanges.TableChange>tableSchemas=new HashMap<>();
  7. BinlogOffset minBinlogOffset=BinlogOffset.INITIAL_OFFSET;for(MySqlSnapshotSplit split:assignedSnapshotSplit){
  8. // find the min binlog offset
  9. BinlogOffset binlogOffset=splitFinishedOffsets.get(split.splitId());if(binlogOffset.compareTo(minBinlogOffset)< 0){
  10. minBinlogOffset=binlogOffset;
  11. }
  12. finishedSnapshotSplitInfos.add(
  13. new FinishedSnapshotSplitInfo(
  14. split.getTableId(),
  15. split.splitId(),
  16. split.getSplitStart(),
  17. split.getSplitEnd(),
  18. binlogOffset));
  19. tableSchemas.putAll(split.getTableSchemas());
  20. }
  21. final MySqlSnapshotSplit lastSnapshotSplit=assignedSnapshotSplit.get(assignedSnapshotSplit.size()-1).asSnapshotSplit();
  22. return new MySqlBinlogSplit(
  23. BINLOG_SPLIT_ID,
  24. lastSnapshotSplit.getSplitKeyType(),
  25. minBinlogOffset,
  26. BinlogOffset.NO_STOPPING_OFFSET,
  27. finishedSnapshotSplitInfos,
  28. tableSchemas);
  29. }

3.4.2 读取低位点到高位点之间的 Binlog

  1. BinlogSplitReader
  2. /**
  3. *Returns the record should emit or not.
  4. *
  5. *<p>The watermark signal algorithm is the binlog split reader only sends the binlog event
  6. that
  7. *belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
  8. *since the offset is after its high watermark.
  9. *
  10. * <pre> E.g: the data input is :
  11. * snapshot-split-0 info : [0, 1024) highWatermark0
  12. *snapshot-split-1 info : [1024, 2048) highWatermark1
  13. *the data output is:
  14. *only the binlog event belong to [0, 1024) and offset is after highWatermark0 should send,
  15. *only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should
  16. send.
  17. *</pre>
  18. */
  19. private boolean shouldEmit(SourceRecord sourceRecord) {
  20. if (isDataChangeRecord(sourceRecord)) {
  21. TableId tableId = getTableId(sourceRecord);
  22. BinlogOffset position = getBinlogPosition(sourceRecord);
  23. //aligned, all snapshot splits of the table has reached max highWatermark if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
  24. return true;
  25. }
  26. Object[] key = getSplitKey(
  27. currentBinlogSplit.getSplitKeyType(),
  28. sourceRecord,
  29. statefulTaskContext.getSchemaNameAdjuster()); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
  30. if (RecordUtils.splitKeyRangeContains(
  31. key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
  32. &&position.isAtOrBefore(splitInfo.getHighWatermark())) { return true;
  33. }
  34. }
  35. //not in the monitored splits scope, do not emit return false;
  36. }
  37. //always send the schema change event and signal event
  38. //we need record them to state of Flink
  39. return true;
  40. }

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号