当前位置:   article > 正文

flinkcdc 原理 + 实践

flinkcdc 原理 + 实践

使用环境

        Flink 1.14.2 + flink cdc 2.2.0

        提示:flinkcdc 2.2版本之后才支持flink 1.14.*,

                  flinkcdc 2.2版本之前不支持 mysql低版本5.6的 cdc.      

CDC1.*版本痛点

  1. 并发

  2. 为了保证一致性,一般通过全量 + 增量进行获取数据。 在全量阶段会进行加锁操作。 通过对 binlog的起始位置和 表的 schema 锁住,保证在全量读取的时候阻止所有新的update。

  3. 全量获取数据的时候不支持checkpoint, fail后需要重新读取。

CDC2.*版本介绍

flinkCDC2.* 优点 :

  并发读取全量数据后无缝转换为单线程读取增量

   通过高水位的方式来替代1.*版本的加锁操作来保证数据一致性。解决1.*版本痛点。

  • chunk切分:

    SourceEnumerator 将表按主键切分chunk.(没有主键可以指定) 分片数表示每个chunk用对应的task并行执 行。达到多并发。 task对 chunk 一对多的关系。 也就是说 一个sourceReader对应一个或多个chunk.

  • chunk分配:

    SourceReader , 每个SourceReader读取表中的一部分数据达到并行读取。

  • chunk读取: (单个Chunk中的数据一致性)

    SourceReader在读取表数据之前,也就是在读取这个chunk切片的时候,会记录当前的binlog位置信息为 低位点。 每个 chunk获取的低位点可能都不一样:比如: 10,30,20 的低位点。

    SourceReader将自身区间内的数据查询并放入buffer(快照读取,等待修正)

    SourceReader查询完成该chunk切片数据之后记录当前binlog位置信息为 高位点。 每个chunk获取的高位点可能也都不一样。比如: 80,70,90 的高位点。

    全量到增量衔接部分:每个chunk消费从低位点到高位点之间的binlog。对buffer数据进行修正。chunk最终的输出则是在对应的该高位点最新的数据。 但是目前只保证了单个chunk中的数据一致性。

  • chunk汇报:

    chunk读取完成之后,会对SourceEnumerator进行汇报,为的是后续的分发binlog chunk. 因为之前的数据针对每个chunk对应的高位点或者低位点获取到的数据范围不一致,为了保证后续继续处理增量的binlog,需要进行分发binlog.

  • chunk分配: (多个Chunk的一致性, 增量阶段)

    在后续需要处理的增量的binlog中,SourceEnumerator会通过下发binlog chunk给任意一个SourceReader进行单并发增量实现。因为增量通常都是数据库单并行操作的,多个task没有意义

    至此切换至增量阶段,会从已完成的全量的所有chunk中筛选出最小的binglog hw。 也就是70开始,当数据到来,判断数据所属chunk,其次,根据状态判断当前数据偏移量如果大于快照时的偏移量 70,那么进行下发,如果小于,则是该chunk的重复数据,直接丢弃。

    解决了全量时期单并发的痛点。并通过高水位方式保证了数据一致性,无缝切换全量 + 增量 阶段。

flink cdc datastream + flink cdc dql 使用

  1. final MySqlSource<String> build = MySqlSource.<String>builder()
  2. .hostname("127.0.0.1")
  3. .port(3307)
  4. .username("root")
  5. .password("123456")
  6. .databaseList("coocaa")
  7. .tableList("coocaa.Course")
  8. .deserializer(new StringDebeziumDeserializationSchema())
  9. .startupOptions(StartupOptions.initial())
  10. // initial: 表示先将之前数据同步,在根据现有的binlog持续更新。
  11. .build();
  12. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. final DataStreamSource<String> dataStreamSource = env.fromSource(build, WatermarkStrategy.noWatermarks(),"mysql-cdc");
  14. dataStreamSource.print();
  15. env.execute();
  1. // TODO: 初始化flink
  2. StreamExecutionEnvironment streamEnv
  3. = StreamExecutionEnvironment.getExecutionEnvironment();
  4. EnvironmentSettings envSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
  5. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
  6. tableEnv.executeSql("" +
  7. "create table tablea( \n" +
  8. " database_name STRING METADATA VIRTUAL, \n" +
  9. " table_name STRING METADATA VIRTUAL, \n" +
  10. " cloumn1 String, \n" +
  11. " cloumn2 String, \n" +
  12. " cloumn3 String, \n" +
  13. " cloumn4 String, \n" +
  14. " cloumn5 String, \n" +
  15. " cloumn6 String, \n" +
  16. " cloumn7 String, \n" +
  17. " cloumn8 int, \n" +
  18. " cloumn9 int, \n" +
  19. " cloumn10 int, \n" +
  20. " cloumn11 String, \n" +
  21. " PRIMARY KEY (cloumn1) NOT ENFORCED \n" +
  22. " ) with ( \n" +
  23. " 'connector' = 'mysql-cdc', \n" +
  24. " 'hostname' = '***', \n" +
  25. " 'port' = '***', \n" +
  26. " 'database-name' = '***', \n" +
  27. " 'username' = 'root', \n" +
  28. " 'password' = '123456', \n" +
  29. " 'table-name' = '***.*', \n" + //可通过正则进行多库多表
  30. " 'scan.incremental.snapshot.enabled' = 'false', \n" +
  31. " 'scan.startup.mode' = 'initial' \n" + //initial or latest-offset
  32. " )");
  33. tableEnv.from("tablea").printSchema();
  34. tableEnv.executeSql("select * from tablea").print();

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/473042
推荐阅读
相关标签
  

闽ICP备14008679号