当前位置:   article > 正文

Flink CDC 2.0 数据处理流程全面解析

flinkcdc如何保证顺序

点击上方蓝色字体,选择“设为星标”

回复”面试“获取更多惊喜

3ca81b17eb9fbf632b71d2aa02334ac9.png

fd4b18fc33e11ad53be9ea4c9f5d599f.png

8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。

Flink CDC2.0 数据读取逻辑并不复杂,复杂的是 FLIP-27: Refactor Source Interface 的设计及对Debezium Api的不了解。本文重点对 Flink CDC 的处理逻辑进行介绍, FLIP-27 的设计及 Debezium 的API调用不做过多讲解。

本文先以Flink SQL 案例来介绍Flink CDC2.0的使用,接着介绍CDC中的核心设计包含切片划分、切分读取、增量读取,最后对数据处理过程中涉及flink-mysql-cdc 接口的调用及实现进行代码讲解。

本文发布自公众号:大数据真好玩

案例

全量读取+增量读取 Mysql表数据,以changelog-json 格式写入kafka,观察 RowKind 类型及影响的数据条数。

  1. public static void main(String[] args) {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
  4.                 .useBlinkPlanner()
  5.                 .inStreamingMode()
  6.                 .build();
  7.         env.setParallelism(3);
  8.         // note: 增量同步需要开启CK
  9.         env.enableCheckpointing(10000);
  10.         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
  11.             
  12.         tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
  13.                 "         `order_id` INTEGER ,\n" +
  14.                 "          `order_date` DATE ,\n" +
  15.                 "          `order_time` TIMESTAMP(3),\n" +
  16.                 "          `quantity` INT ,\n" +
  17.                 "          `product_id` INT ,\n" +
  18.                 "          `purchaser` STRING,\n" +
  19.                 "           primary key(order_id)  NOT ENFORCED" +
  20.                 "         ) WITH (\n" +
  21.                 "          'connector' = 'mysql-cdc',\n" +
  22.                 "          'hostname' = 'localhost',\n" +
  23.                 "          'port' = '3306',\n" +
  24.                 "          'username' = 'cdc',\n" +
  25.                 "          'password' = '123456',\n" +
  26.                 "          'database-name' = 'test',\n" +
  27.                 "          'table-name' = 'demo_orders'," +
  28.                             //  全量 + 增量同步   
  29.                 "          'scan.startup.mode' = 'initial'      " +
  30.                 " )");
  31.             tableEnvironment.executeSql("CREATE TABLE sink (\n" +
  32.                 "         `order_id` INTEGER ,\n" +
  33.                 "          `order_date` DATE ,\n" +
  34.                 "          `order_time` TIMESTAMP(3),\n" +
  35.                 "          `quantity` INT ,\n" +
  36.                 "          `product_id` INT ,\n" +
  37.                 "          `purchaser` STRING,\n" +
  38.                 "          primary key (order_id)  NOT ENFORCED " +
  39.                 ") WITH (\n" +
  40.                 "    'connector' = 'kafka',\n" +
  41.                 "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
  42.                 "    'topic' = 'mqTest02',\n" +
  43.                 "    'format' = 'changelog-json' "+
  44.                 ")");
  45.             tableEnvironment.executeSql("insert into sink select * from demoOrders");}

全量数据输出:

  1. {"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
  2. {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
  3. {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  4. {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
  5. {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  6. {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
  7. {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
  8. {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
  9. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  10. {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
  11. {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}

修改表数据,增量捕获:

  1. ## 更新 1005 的值 
  2. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
  3. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
  4. ## 删除 1000 
  5. {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}

核心设计

本文发布自公众号:大数据真好玩

切片划分

全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个Chunk,后续子任务读取Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的Chunk及非均匀分布的Chunk。

均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize大小,直接计算chunk 的结束位置。

  1. //  计算主键列数据区间
  2. select min(`order_id`), max(`order_id`) from demo_orders;
  3. //  将数据划分为 chunkSize 大小的切片
  4. chunk-0: [min,start + chunkSize)
  5. chunk-1: [start + chunkSize, start + 2chunkSize)
  6. .......
  7. chunk-last: [max,null)

非均匀分布

主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。

  1. // 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
  2. chunkend = SELECT MAX(`order_id`) FROM (
  3.         SELECT `order_id`  FROM `demo_orders` 
  4.         WHERE `order_id` >= [前一个切片的起始位置] 
  5.         ORDER BY `order_id` ASC 
  6.         LIMIT   [chunkSize]  
  7.     ) AS T

全量切片数据读取

Flink 将表数据划分为多个Chunk,子任务在不加锁的情况下,并行读取 Chunk数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段Flink 使用快照记录读取+Binlog数据修正的方式来保证数据的一致性。

快照读取

通过JDBC执行SQL查询切片范围的数据记录。

  1. ## 快照记录数据读取SQL 
  2. SELECT * FROM `test`.`demo_orders` 
  3. WHERE order_id >= [chunkStart] 
  4. AND NOT (order_id = [chunkEnd]) 
  5. AND order_id <= [chunkEnd]

数据修正

在快照读取操作前、后执行 SHOW MASTER STATUS 查询binlog文件的当前偏移量,在快照读取完毕后,查询区间内的binlog数据并对读取的快照记录进行修正。

快照读取+Binlog数据读取时的数据组织结构。

6373e122a46437d9b773ecbfd3df0b25.png

BinlogEvents 修正 SnapshotEvents 规则。

  • 未读取到binlog数据,即在执行select阶段没有其他事务进行操作,直接下发所有快照记录。

  • 读取到binlog数据,且变更的数据记录不属于当前切片,下发快照记录。

  • 读取到binlog数据,且数据记录的变更属于当前切片。delete 操作从快照内存中移除该数据,insert 操作向快照内存添加新的数据,update操作向快照内存中添加变更记录,最终会输出更新前后的两条记录到下游。

修正后的数据组织结构:

57fdbc84a6b67b5308a20758a34b6f02.png

以读取切片[1,11)范围的数据为例,描述切片数据的处理过程。c,d,u代表Debezium捕获到的新增、删除、更新操作。

修正前数据及结构:

2e96a75d8c17ae7dfac62ec978df1e31.png

修正后数据及结构:

4968458befb514a0b8d124f69ad1deca.png

单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

增量切片数据读取

全量阶段切片数据读取完成后,SplitEnumerator 会下发一个 BinlogSplit 进行增量数据读取。BinlogSplit读取最重要的属性就是起始偏移量,偏移量如果设置过小下游可能会有重复数据,偏移量如果设置过大下游可能是已超期的脏数据。而 Flink CDC增量读取的起始偏移量为所有已完成的全量切片最小的Binlog偏移量,只有满足条件的数据才被下发到下游。

数据下发条件:

  • 捕获的Binlog数据的偏移量 > 数据所属分片的Binlog的最大偏移量。

例如,SplitEnumerator 保留的已完成切片信息为。

2e59457490de1c03df6a6ffbdf1eab14.png

增量读取时,从偏移量 800 开始读取Binlog数据 ,当捕获到数据 <data:123, offset:1500> 时,先找到 123 所属快照分片,并找到对应的最大Binlog 偏移量 800。当前偏移量大于快照读的最大偏移量,则下发数据,否则直接丢弃。

代码详解

本文发布自公众号:大数据真好玩

关于 FLIP-27: Refactor Source Interface 设计不做详细介绍,本文侧重对 flink-mysql-cdc 接口调用及实现进行讲解。

MySqlSourceEnumerator 初始化

SourceCoordinator作为OperatorCoordinator对Source的实现,运行在Master节点,在启动时通过调用MySqlParallelSource#createEnumerator 创建 MySqlSourceEnumerator 并调用start方法,做一些初始化工作。

ad87c668018dded78169dceea031d715.png
  1. 创建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 对全量+增量数据进行切片,使用 MySqlValidator 对 mysql 版本、配置进行校验。

  2. MySqlValidator 校验:

  • mysql版本必须大于等于5.7。

  • binlog_format 配置必须为 ROW。

  • binlog_row_image 配置必须为 FULL。

  1. MySqlSplitAssigner 初始化:

  • 创建 ChunkSplitter用来划分切片。

  • 筛选出要读的表名称。

  1. 启动周期调度线程,要求 SourceReader 向 SourceEnumerator 发送已完成但未发送ACK事件的切片信息。

  1. private void syncWithReaders(int[] subtaskIds, Throwable t) {
  2.     if (t != null) {
  3.         throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t);
  4.     }
  5.     // when the SourceEnumerator restores or the communication failed between
  6.     // SourceEnumerator and SourceReader, it may missed some notification event.
  7.     // tell all SourceReader(s) to report there finished but unacked splits.
  8.     if (splitAssigner.waitingForFinishedSplits()) {
  9.         for (int subtaskId : subtaskIds) {
  10.             // note: 发送 FinishedSnapshotSplitsRequestEvent 
  11.             context.sendEventToSourceReader(
  12.                     subtaskId, new FinishedSnapshotSplitsRequestEvent());
  13.         }
  14.     }
  15. }

MySqlSourceReader 初始化

SourceOperator 集成了SourceReader,通过OperatorEventGateway 和 SourceCoordinator 进行交互。

79f11491dde616b411d530a4177015bf.png
  1. SourceOperator 在初始化时,通过 MySqlParallelSource 创建 MySqlSourceReader。MySqlSourceReader 通过 SingleThreadFetcherManager 创建Fetcher拉取分片数据,数据以 MySqlRecords 格式写入到 elementsQueue。

  1. MySqlParallelSource#createReader
  2. public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext) throws Exception {
  3.     // note:  数据存储队列
  4. FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
  5.         new FutureCompletingBlockingQueue<>();
  6. final Configuration readerConfiguration = getReaderConfig(readerContext);
  7.     // note: Split Reader 工厂类
  8. Supplier<MySqlSplitReader> splitReaderSupplier =
  9.         () -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask());
  10. return new MySqlSourceReader<>(
  11.         elementsQueue,
  12.         splitReaderSupplier,
  13.         new MySqlRecordEmitter<>(deserializationSchema),
  14.         readerConfiguration,
  15.         readerContext);
  16. }
  1. 将创建的 MySqlSourceReader 以事件的形式传递给 SourceCoordinator 进行注册。SourceCoordinator 接收到注册事件后,将reader 地址及索引进行保存。

  1. SourceCoordinator#handleReaderRegistrationEvent
  2. // note: SourceCoordinator 处理Reader 注册事件
  3. private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
  4.     context.registerSourceReader(new ReaderInfo(event.subtaskId(), event.location()));
  5.     enumerator.addReader(event.subtaskId());
  6. }
  1. MySqlSourceReader 启动后会向 MySqlSourceEnumerator 发送请求分片事件,从而收集分配的切片数据。

  2. SourceOperator 初始化完毕后,调用 emitNext 由 SourceReaderBase 从 elementsQueue 获取数据集合并下发给 MySqlRecordEmitter。接口调用示意图:

a94e3c7ba89173ccbe0323b6c9c57488.png

MySqlSourceEnumerator 处理分片请求

MySqlSourceReader 启动时会向 MySqlSourceEnumerator 发送请求 RequestSplitEvent 事件,根据返回的切片范围读取区间数据。MySqlSourceEnumerator 全量读取阶段分片请求处理逻辑,最终返回一个MySqlSnapshotSplit。

c0cb0b23087c6cdbc98e628deb7369f5.png
  1. 处理切片请求事件,为请求的Reader分配切片,通过发送AddSplitEvent时间传递MySqlSplit(全量阶段MySqlSnapshotSplit、增量阶段MySqlBinlogSplit)。

  1. MySqlSourceEnumerator#handleSplitRequest
  2. public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
  3.     if (!context.registeredReaders().containsKey(subtaskId)) {
  4.         // reader failed between sending the request and now. skip this request.
  5.         return;
  6.     }
  7.     // note:  将reader所属的subtaskId存储到TreeSet, 在处理binlog split时优先分配个task-0
  8.     readersAwaitingSplit.add(subtaskId);
  9.     assignSplits();
  10. }
  11. // note: 分配切片
  12. private void assignSplits() {
  13.     final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
  14.     while (awaitingReader.hasNext()) {
  15.         int nextAwaiting = awaitingReader.next();
  16.         // if the reader that requested another split has failed in the meantime, remove
  17.         // it from the list of waiting readers
  18.         if (!context.registeredReaders().containsKey(nextAwaiting)) {
  19.             awaitingReader.remove();
  20.             continue;
  21.         }
  22.         //note: 由 MySqlSplitAssigner 分配切片
  23.         Optional<MySqlSplit> split = splitAssigner.getNext();
  24.         if (split.isPresent()) {
  25.             final MySqlSplit mySqlSplit = split.get();
  26.             //  note: 发送AddSplitEvent, 为 Reader 返回切片信息
  27.             context.assignSplit(mySqlSplit, nextAwaiting);
  28.             awaitingReader.remove();
  29.             LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
  30.         } else {
  31.             // there is no available splits by now, skip assigning
  32.             break;
  33.         }
  34.     }
  35. }
  1. MySqlHybridSplitAssigner 处理全量切片、增量切片的逻辑。

  • 任务刚启动时,remainingTables不为空,noMoreSplits返回值为false,创建 SnapshotSplit。

  • 全量阶段分片读取完成后,noMoreSplits返回值为true, 创建 BinlogSplit。

  1. MySqlHybridSplitAssigner#getNext
  2. @Override
  3. public Optional<MySqlSplit> getNext() {
  4.     if (snapshotSplitAssigner.noMoreSplits()) {
  5.         // binlog split assigning
  6.         if (isBinlogSplitAssigned) {
  7.             // no more splits for the assigner
  8.             return Optional.empty();
  9.         } else if (snapshotSplitAssigner.isFinished()) {
  10.             // we need to wait snapshot-assigner to be finished before
  11.             // assigning the binlog split. Otherwise, records emitted from binlog split
  12.             // might be out-of-order in terms of same primary key with snapshot splits.
  13.             isBinlogSplitAssigned = true;
  14.             //note: snapshot split 切片完成后,创建BinlogSplit。
  15.             return Optional.of(createBinlogSplit());
  16.         } else {
  17.             // binlog split is not ready by now
  18.             return Optional.empty();
  19.         }
  20.     } else {
  21.         // note: 由MySqlSnapshotSplitAssigner 创建 SnapshotSplit
  22.         // snapshot assigner still have remaining splits, assign split from it
  23.         return snapshotSplitAssigner.getNext();
  24.     }
  25. }
  1. MySqlSnapshotSplitAssigner 处理全量切片逻辑,通过 ChunkSplitter 生成切片,并存储到Iterator中。

  1. @Override
  2. public Optional<MySqlSplit> getNext() {
  3.     if (!remainingSplits.isEmpty()) {
  4.         // return remaining splits firstly
  5.         Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
  6.         MySqlSnapshotSplit split = iterator.next();
  7.         iterator.remove();
  8.         
  9.         //note: 已分配的切片存储到 assignedSplits 集合
  10.         assignedSplits.put(split.splitId(), split);
  11.         return Optional.of(split);
  12.     } else {
  13.         // note: 初始化阶段 remainingTables 存储了要读取的表名
  14.         TableId nextTable = remainingTables.pollFirst();
  15.         if (nextTable != null) {
  16.             // split the given table into chunks (snapshot splits)
  17.             //  note: 初始化阶段创建了 ChunkSplitter,调用generateSplits 进行切片划分
  18.             Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
  19.             //  note: 保留所有切片信息
  20.             remainingSplits.addAll(splits);
  21.             //  note: 已经完成分片的 Table
  22.             alreadyProcessedTables.add(nextTable);
  23.             //  note: 递归调用该该方法
  24.             return getNext();
  25.         } else {
  26.             return Optional.empty();
  27.         }
  28.     }
  29. }
  1. ChunkSplitter 将表划分为均匀分布 or 不均匀分布切片的逻辑。读取的表必须包含物理主键。

  1. public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
  2.     Table schema = mySqlSchema.getTableSchema(tableId).getTable();
  3.     List<Column> primaryKeys = schema.primaryKeyColumns();
  4.     // note: 必须有主键
  5.     if (primaryKeys.isEmpty()) {
  6.         throw new ValidationException(
  7.                 String.format(
  8.                         "Incremental snapshot for tables requires primary key,"
  9.                                 + " but table %s doesn't have primary key.",
  10.                         tableId));
  11.     }
  12.     // use first field in primary key as the split key
  13.     Column splitColumn = primaryKeys.get(0);
  14.     final List<ChunkRange> chunks;
  15.     try {
  16.          // note: 按主键列将数据划分成多个切片
  17.         chunks = splitTableIntoChunks(tableId, splitColumn);
  18.     } catch (SQLException e) {
  19.         throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
  20.     }
  21.     //note: 主键数据类型转换、ChunkRange 包装成MySqlSnapshotSplit。
  22.     // convert chunks into splits
  23.     List<MySqlSnapshotSplit> splits = new ArrayList<>();
  24.     RowType splitType = splitType(splitColumn);
  25.  
  26.     for (int i = 0; i < chunks.size(); i++) {
  27.         ChunkRange chunk = chunks.get(i);
  28.         MySqlSnapshotSplit split =
  29.                 createSnapshotSplit(
  30.                         tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
  31.         splits.add(split);
  32.     }
  33.     return splits;
  34. }
  1. splitTableIntoChunks 根据物理主键划分切片。

  1. private List<ChunkRange> splitTableIntoChunks(TableId tableId, Column splitColumn)
  2.         throws SQLException {
  3.     final String splitColumnName = splitColumn.name();
  4.     //  select min, max
  5.     final Object[] minMaxOfSplitColumn = queryMinMax(jdbc, tableId, splitColumnName);
  6.     final Object min = minMaxOfSplitColumn[0];
  7.     final Object max = minMaxOfSplitColumn[1];
  8.     if (min == null || max == null || min.equals(max)) {
  9.         // empty table, or only one row, return full table scan as a chunk
  10.         return Collections.singletonList(ChunkRange.all());
  11.     }
  12.     final List<ChunkRange> chunks;
  13.     if (splitColumnEvenlyDistributed(splitColumn)) {
  14.         // use evenly-sized chunks which is much efficient
  15.         // note: 按主键均匀划分
  16.         chunks = splitEvenlySizedChunks(min, max);
  17.     } else {
  18.         // note: 按主键非均匀划分
  19.         // use unevenly-sized chunks which will request many queries and is not efficient.
  20.         chunks = splitUnevenlySizedChunks(tableId, splitColumnName, min, max);
  21.     }
  22.     return chunks;
  23. }
  24. /** Checks whether split column is evenly distributed across its range. */
  25. private static boolean splitColumnEvenlyDistributed(Column splitColumn) {
  26.     // only column is auto-incremental are recognized as evenly distributed.
  27.     // TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future.
  28.     if (splitColumn.isAutoIncremented()) {
  29.         DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
  30.         LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
  31.         // currently, we only support split column with type BIGINT, INT, DECIMAL
  32.         return typeRoot == LogicalTypeRoot.BIGINT
  33.                 || typeRoot == LogicalTypeRoot.INTEGER
  34.                 || typeRoot == LogicalTypeRoot.DECIMAL;
  35.     } else {
  36.         return false;
  37.     }
  38. }
  39. /**
  40.  *  根据拆分列的最小值和最大值将表拆分为大小均匀的块,并以 {@link #chunkSize} 步长滚动块。
  41.  * Split table into evenly sized chunks based on the numeric min and max value of split column,
  42.  * and tumble chunks in {@link #chunkSize} step size.
  43.  */
  44. private List<ChunkRange> splitEvenlySizedChunks(Object min, Object max) {
  45.     if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
  46.         // there is no more than one chunk, return full table as a chunk
  47.         return Collections.singletonList(ChunkRange.all());
  48.     }
  49.     final List<ChunkRange> splits = new ArrayList<>();
  50.     Object chunkStart = null;
  51.     Object chunkEnd = ObjectUtils.plus(min, chunkSize);
  52.     //  chunkEnd <= max
  53.     while (ObjectUtils.compare(chunkEnd, max) <= 0) {
  54.         splits.add(ChunkRange.of(chunkStart, chunkEnd));
  55.         chunkStart = chunkEnd;
  56.         chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
  57.     }
  58.     // add the ending split
  59.     splits.add(ChunkRange.of(chunkStart, null));
  60.     return splits;
  61. }
  62. /**   通过连续计算下一个块最大值,将表拆分为大小不均匀的块。
  63.  * Split table into unevenly sized chunks by continuously calculating next chunk max value. */
  64. private List<ChunkRange> splitUnevenlySizedChunks(
  65.         TableId tableId, String splitColumnName, Object min, Object max) throws SQLException {
  66.     final List<ChunkRange> splits = new ArrayList<>();
  67.     Object chunkStart = null;
  68.     Object chunkEnd = nextChunkEnd(min, tableId, splitColumnName, max);
  69.     int count = 0;
  70.     while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
  71.         // we start from [null, min + chunk_size) and avoid [null, min)
  72.         splits.add(ChunkRange.of(chunkStart, chunkEnd));
  73.         // may sleep a while to avoid DDOS on MySQL server
  74.         maySleep(count++);
  75.         chunkStart = chunkEnd;
  76.         chunkEnd = nextChunkEnd(chunkEnd, tableId, splitColumnName, max);
  77.     }
  78.     // add the ending split
  79.     splits.add(ChunkRange.of(chunkStart, null));
  80.     return splits;
  81. }
  82. private Object nextChunkEnd(
  83.         Object previousChunkEnd, TableId tableId, String splitColumnName, Object max)
  84.         throws SQLException {
  85.     // chunk end might be null when max values are removed
  86.     Object chunkEnd =
  87.             queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
  88.     if (Objects.equals(previousChunkEnd, chunkEnd)) {
  89.         // we don't allow equal chunk start and end,
  90.         // should query the next one larger than chunkEnd
  91.         chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
  92.     }
  93.     if (ObjectUtils.compare(chunkEnd, max) >= 0) {
  94.         return null;
  95.     } else {
  96.         return chunkEnd;
  97.     }
  98. }

MySqlSourceReader 处理切片分配请求

c603addfd905668f4355d0476beb78fc.png

MySqlSourceReader接收到切片分配请求后,会为先创建一个 SplitFetcher线程,向 taskQueue 添加、执行AddSplitsTask 任务用来处理添加分片任务,接着执行 FetchTask 使用Debezium API进行读取数据,读取的数据存储到elementsQueue中,SourceReaderBase 会从该队列中获取数据,并下发给 MySqlRecordEmitter。

  1. 处理切片分配事件时,创建SplitFetcher向taskQueue添加AddSplitsTask。

  1. SingleThreadFetcherManager#addSplits
  2. public void addSplits(List<SplitT> splitsToAdd) {
  3.     SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
  4.     if (fetcher == null) {
  5.         fetcher = createSplitFetcher();
  6.         // Add the splits to the fetchers.
  7.         fetcher.addSplits(splitsToAdd);
  8.         startFetcher(fetcher);
  9.     } else {
  10.         fetcher.addSplits(splitsToAdd);
  11.     }
  12. }
  13. // 创建 SplitFetcher
  14. protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
  15.     if (closed) {
  16.         throw new IllegalStateException("The split fetcher manager has closed.");
  17.     }
  18.     // Create SplitReader.
  19.     SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
  20.     int fetcherId = fetcherIdGenerator.getAndIncrement();
  21.     SplitFetcher<E, SplitT> splitFetcher =
  22.             new SplitFetcher<>(
  23.                     fetcherId,
  24.                     elementsQueue,
  25.                     splitReader,
  26.                     errorHandler,
  27.                     () -> {
  28.                         fetchers.remove(fetcherId);
  29.                         elementsQueue.notifyAvailable();
  30.                     });
  31.     fetchers.put(fetcherId, splitFetcher);
  32.     return splitFetcher;
  33. }
  34. public void addSplits(List<SplitT> splitsToAdd) {
  35.     enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
  36.     wakeUp(true);
  37. }
  1. 执行 SplitFetcher线程,首次执行 AddSplitsTask 线程添加分片,以后执行 FetchTask 线程拉取数据。

  1. SplitFetcher#runOnce
  2. void runOnce() {
  3.     try {
  4.         if (shouldRunFetchTask()) {
  5.             runningTask = fetchTask;
  6.         } else {
  7.             runningTask = taskQueue.take();
  8.         }
  9.         
  10.         if (!wakeUp.get() && runningTask.run()) {
  11.             LOG.debug("Finished running task {}", runningTask);
  12.             runningTask = null;
  13.             checkAndSetIdle();
  14.         }
  15.     } catch (Exception e) {
  16.         throw new RuntimeException(
  17.                 String.format(
  18.                         "SplitFetcher thread %d received unexpected exception while polling the records",
  19.                         id),
  20.                 e);
  21.     }
  22.     maybeEnqueueTask(runningTask);
  23.     synchronized (wakeUp) {
  24.         // Set the running task to null. It is necessary for the shutdown method to avoid
  25.         // unnecessarily interrupt the running task.
  26.         runningTask = null;
  27.         // Set the wakeUp flag to false.
  28.         wakeUp.set(false);
  29.         LOG.debug("Cleaned wakeup flag.");
  30.     }
  31. }
  1. AddSplitsTask 调用 MySqlSplitReader 的 handleSplitsChanges方法,向切片队列中添加已分配的切片信息。在下一次fetch()调用时,从队列中获取切片并读取切片数据。

  1. AddSplitsTask#run
  2. public boolean run() {
  3.     for (SplitT s : splitsToAdd) {
  4.         assignedSplits.put(s.splitId(), s);
  5.     }
  6.     splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
  7.     return true;
  8. }
  9. MySqlSplitReader#handleSplitsChanges
  10. public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
  11.     if (!(splitsChanges instanceof SplitsAddition)) {
  12.         throw new UnsupportedOperationException(
  13.                 String.format(
  14.                         "The SplitChange type of %s is not supported.",
  15.                         splitsChanges.getClass()));
  16.     }
  17.     //note: 添加切片 到队列。
  18.     splits.addAll(splitsChanges.splits());
  19. }
  1. MySqlSplitReader 执行fetch(),由DebeziumReader读取数据到事件队列,在对数据修正后以MySqlRecords格式返回。

  1. MySqlSplitReader#fetch
  2. @Override
  3. public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
  4.     // note: 创建Reader 并读取数据
  5.     checkSplitOrStartNext();
  6.     Iterator<SourceRecord> dataIt = null;
  7.     try {
  8.         // note:  对读取的数据进行修正
  9.         dataIt = currentReader.pollSplitRecords();
  10.     } catch (InterruptedException e) {
  11.         LOG.warn("fetch data failed.", e);
  12.         throw new IOException(e);
  13.     }
  14.     //  note: 返回的数据被封装为 MySqlRecords 进行传输
  15.     return dataIt == null
  16.             ? finishedSnapshotSplit()   
  17.             : MySqlRecords.forRecords(currentSplitId, dataIt);
  18. }
  19. private void checkSplitOrStartNext() throws IOException {
  20.     // the binlog reader should keep alive
  21.     if (currentReader instanceof BinlogSplitReader) {
  22.         return;
  23.     }
  24.     if (canAssignNextSplit()) {
  25.         // note:  从切片队列读取MySqlSplit
  26.         final MySqlSplit nextSplit = splits.poll();
  27.         if (nextSplit == null) {
  28.             throw new IOException("Cannot fetch from another split - no split remaining");
  29.         }
  30.         currentSplitId = nextSplit.splitId();
  31.         // note:  区分全量切片读取还是增量切片读取
  32.         if (nextSplit.isSnapshotSplit()) {
  33.             if (currentReader == null) {
  34.                 final MySqlConnection jdbcConnection = getConnection(config);
  35.                 final BinaryLogClient binaryLogClient = getBinaryClient(config);
  36.                 final StatefulTaskContext statefulTaskContext =
  37.                         new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
  38.                 // note: 创建SnapshotSplitReader,使用Debezium Api读取分配数据及区间Binlog值
  39.                 currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
  40.             }
  41.         } else {
  42.             // point from snapshot split to binlog split
  43.             if (currentReader != null) {
  44.                 LOG.info("It's turn to read binlog split, close current snapshot reader");
  45.                 currentReader.close();
  46.             }
  47.             final MySqlConnection jdbcConnection = getConnection(config);
  48.             final BinaryLogClient binaryLogClient = getBinaryClient(config);
  49.             final StatefulTaskContext statefulTaskContext =
  50.                     new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
  51.             LOG.info("Create binlog reader");
  52.             // note: 创建BinlogSplitReader,使用Debezium API进行增量读取
  53.             currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
  54.         }
  55.         // note: 执行Reader进行数据读取
  56.         currentReader.submitSplit(nextSplit);
  57.     }
  58. }

DebeziumReader 数据处理

本文发布自公众号:大数据真好玩

DebeziumReader 包含全量切片读取、增量切片读取两个阶段,数据读取后存储到 ChangeEventQueue,执行pollSplitRecords 时对数据进行修正。

  1. SnapshotSplitReader 全量切片读取。全量阶段的数据读取通过执行Select语句查询出切片范围内的表数据,在写入队列前后执行 SHOW MASTER STATUS 时,写入当前偏移量。

  1. public void submitSplit(MySqlSplit mySqlSplit) {
  2.     ......
  3.     executor.submit(
  4.             () -> {
  5.                 try {
  6.                     currentTaskRunning = true;
  7.                     // note: 数据读取,在数据前后插入Binlog当前偏移量
  8.                     // 1. execute snapshot read task。 
  9.                     final SnapshotSplitChangeEventSourceContextImpl sourceContext =
  10.                             new SnapshotSplitChangeEventSourceContextImpl();
  11.                     SnapshotResult snapshotResult =
  12.                             splitSnapshotReadTask.execute(sourceContext);
  13.                     //  note: 为增量读取做准备,包含了起始偏移量
  14.                     final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext);
  15.                     final MySqlOffsetContext mySqlOffsetContext =
  16.                             statefulTaskContext.getOffsetContext();
  17.                     mySqlOffsetContext.setBinlogStartPoint(
  18.                             appendBinlogSplit.getStartingOffset().getFilename(),
  19.                             appendBinlogSplit.getStartingOffset().getPosition());
  20.                     //  note: 从起始偏移量开始读取           
  21.                     // 2. execute binlog read task
  22.                     if (snapshotResult.isCompletedOrSkipped()) {
  23.                         // we should only capture events for the current table,
  24.                         Configuration dezConf =
  25.                                 statefulTaskContext
  26.                                         .getDezConf()
  27.                                         .edit()
  28.                                         .with(
  29.                                                 "table.whitelist",
  30.                                                 currentSnapshotSplit.getTableId())
  31.                                         .build();
  32.                         // task to read binlog for current split
  33.                         MySqlBinlogSplitReadTask splitBinlogReadTask =
  34.                                 new MySqlBinlogSplitReadTask(
  35.                                         new MySqlConnectorConfig(dezConf),
  36.                                         mySqlOffsetContext,
  37.                                         statefulTaskContext.getConnection(),
  38.                                         statefulTaskContext.getDispatcher(),
  39.                                         statefulTaskContext.getErrorHandler(),
  40.                                         StatefulTaskContext.getClock(),
  41.                                         statefulTaskContext.getTaskContext(),
  42.                                         (MySqlStreamingChangeEventSourceMetrics)
  43.                                                 statefulTaskContext
  44.                                                         .getStreamingChangeEventSourceMetrics(),
  45.                                         statefulTaskContext
  46.                                                 .getTopicSelector()
  47.                                                 .getPrimaryTopic(),
  48.                                         appendBinlogSplit);
  49.                         splitBinlogReadTask.execute(
  50.                                 new SnapshotBinlogSplitChangeEventSourceContextImpl());
  51.                     } else {
  52.                         readException =
  53.                                 new IllegalStateException(
  54.                                         String.format(
  55.                                                 "Read snapshot for mysql split %s fail",
  56.                                                 currentSnapshotSplit));
  57.                     }
  58.                 } catch (Exception e) {
  59.                     currentTaskRunning = false;
  60.                     LOG.error(
  61.                             String.format(
  62.                                     "Execute snapshot read task for mysql split %s fail",
  63.                                     currentSnapshotSplit),
  64.                             e);
  65.                     readException = e;
  66.                 }
  67.             });
  68. }
  1. SnapshotSplitReader 增量切片读取。增量阶段切片读取重点是判断BinlogSplitReadTask什么时候停止,在读取到分片阶段的结束时的偏移量即终止。

  1. MySqlBinlogSplitReadTask#handleEvent
  2. protected void handleEvent(Event event) {
  3.     // note: 事件下发 队列
  4.     super.handleEvent(event);
  5.     // note: 全量读取阶段需要终止Binlog读取
  6.     // check do we need to stop for read binlog for snapshot split.
  7.     if (isBoundedRead()) {
  8.         final BinlogOffset currentBinlogOffset =
  9.                 new BinlogOffset(
  10.                         offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
  11.                         Long.parseLong(
  12.                                 offsetContext
  13.                                         .getOffset()
  14.                                         .get(BINLOG_POSITION_OFFSET_KEY)
  15.                                         .toString()));
  16.         // note: currentBinlogOffset > HW 停止读取
  17.         // reach the high watermark, the binlog reader should finished
  18.         if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) {
  19.             // send binlog end event
  20.             try {
  21.                 signalEventDispatcher.dispatchWatermarkEvent(
  22.                         binlogSplit,
  23.                         currentBinlogOffset,
  24.                         SignalEventDispatcher.WatermarkKind.BINLOG_END);
  25.             } catch (InterruptedException e) {
  26.                 logger.error("Send signal event error.", e);
  27.                 errorHandler.setProducerThrowable(
  28.                         new DebeziumException("Error processing binlog signal event", e));
  29.             }
  30.             //  终止binlog读取
  31.             // tell reader the binlog task finished
  32.             ((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
  33.         }
  34.     }
  35. }
  1. SnapshotSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正。具体处理逻辑查看 RecordUtils#normalizedSplitRecords。

  1. public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
  2.     if (hasNextElement.get()) {
  3.         // data input: [low watermark event][snapshot events][high watermark event][binlogevents][binlog-end event]
  4.         // data output: [low watermark event][normalized events][high watermark event]
  5.         boolean reachBinlogEnd = false;
  6.         final List<SourceRecord> sourceRecords = new ArrayList<>();
  7.         while (!reachBinlogEnd) {
  8.             // note: 处理队列中写入的 DataChangeEvent 事件
  9.             List<DataChangeEvent> batch = queue.poll();
  10.             for (DataChangeEvent event : batch) {
  11.                 sourceRecords.add(event.getRecord());
  12.                 if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
  13.                     reachBinlogEnd = true;
  14.                     break;
  15.                 }
  16.             }
  17.         }
  18.         // snapshot split return its data once
  19.         hasNextElement.set(false);
  20.         //  ************   修正数据  ***********
  21.         return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
  22.                 .iterator();
  23.     }
  24.     // the data has been polled, no more data
  25.     reachEnd.compareAndSet(falsetrue);
  26.     return null;
  27. }
  1. BinlogSplitReader 数据读取。读取逻辑比较简单,重点是起始偏移量的设置,起始偏移量为所有切片的HW。

  2. BinlogSplitReader 执行pollSplitRecords 时对队列中的原始数据进行修正,保障数据一致性。增量阶段的Binlog读取是无界的,数据会全部下发到事件队列,BinlogSplitReader 通过shouldEmit()判断数据是否下发。

  1. BinlogSplitReader#pollSplitRecords
  2. public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
  3.     checkReadException();
  4.     final List<SourceRecord> sourceRecords = new ArrayList<>();
  5.     if (currentTaskRunning) {
  6.         List<DataChangeEvent> batch = queue.poll();
  7.         for (DataChangeEvent event : batch) {
  8.             if (shouldEmit(event.getRecord())) {
  9.                 sourceRecords.add(event.getRecord());
  10.             }
  11.         }
  12.     }
  13.     return sourceRecords.iterator();
  14. }

事件下发条件:

  1. 新收到的event post 大于 maxwm

  2. 当前 data值所属某个snapshot spilt & 偏移量大于 HWM,下发数据。

  1. /**
  2.  *
  3.  * Returns the record should emit or not.
  4.  *
  5.  * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
  6.  * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
  7.  * since the offset is after its high watermark.
  8.  *
  9.  * <pre> E.g: the data input is :
  10.  *    snapshot-split-0 info : [0,    1024) highWatermark0
  11.  *    snapshot-split-1 info : [1024, 2048) highWatermark1
  12.  *  the data output is:
  13.  *  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,
  14.  *  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
  15.  * </pre>
  16.  */
  17. private boolean shouldEmit(SourceRecord sourceRecord) {
  18.     if (isDataChangeRecord(sourceRecord)) {
  19.         TableId tableId = getTableId(sourceRecord);
  20.         BinlogOffset position = getBinlogPosition(sourceRecord);
  21.         // aligned, all snapshot splits of the table has reached max highWatermark
  22.        
  23.         // note:  新收到的event post 大于 maxwm ,直接下发
  24.         if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
  25.             return true;
  26.         }
  27.         Object[] key =
  28.                 getSplitKey(
  29.                         currentBinlogSplit.getSplitKeyType(),
  30.                         sourceRecord,
  31.                         statefulTaskContext.getSchemaNameAdjuster());
  32.         for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
  33.             /**
  34.              *  note: 当前 data值所属某个snapshot spilt &  偏移量大于 HWM,下发数据
  35.              */
  36.             if (RecordUtils.splitKeyRangeContains(
  37.                             key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
  38.                     && position.isAtOrBefore(splitInfo.getHighWatermark())) {
  39.                 return true;
  40.             }
  41.         }
  42.         // not in the monitored splits scope, do not emit
  43.         return false;
  44.     }
  45.     // always send the schema change event and signal event
  46.     // we need record them to state of Flink
  47.     return true;
  48. }
MySqlRecordEmitter 数据下发

SourceReaderBase 从队列中获取切片读取的DataChangeEvent数据集合,将数据类型由Debezium的DataChangeEvent 转换为Flink 的RowData类型。

  1. SourceReaderBase 处理切片数据流程

  1. org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext
  2. public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
  3.     // make sure we have a fetch we are working on, or move to the next
  4.     RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
  5.     if (recordsWithSplitId == null) {
  6.         recordsWithSplitId = getNextFetch(output);
  7.         if (recordsWithSplitId == null) {
  8.             return trace(finishedOrAvailableLater());
  9.         }
  10.     }
  11.     // we need to loop here, because we may have to go across splits
  12.     while (true) {
  13.         // Process one record.
  14.         // note:  通过MySqlRecords从迭代器中读取单条数据
  15.         final E record = recordsWithSplitId.nextRecordFromSplit();
  16.         if (record != null) {
  17.             // emit the record.
  18.             recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
  19.             LOG.trace("Emitted record: {}", record);
  20.             // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
  21.             // more is available. If nothing more is available, the next invocation will find
  22.             // this out and return the correct status.
  23.             // That means we emit the occasional 'false positive' for availability, but this
  24.             // saves us doing checks for every record. Ultimately, this is cheaper.
  25.             return trace(InputStatus.MORE_AVAILABLE);
  26.         } else if (!moveToNextSplit(recordsWithSplitId, output)) {
  27.             // The fetch is done and we just discovered that and have not emitted anything, yet.
  28.             // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
  29.             // rather than emitting nothing and waiting for the caller to call us again.
  30.             return pollNext(output);
  31.         }
  32.         // else fall through the loop
  33.     }
  34. }
  35. private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
  36.     splitFetcherManager.checkErrors();
  37.     LOG.trace("Getting next source data batch from queue");
  38.     // note: 从elementsQueue 获取数据
  39.     final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
  40.     if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
  41.         return null;
  42.     }
  43.     currentFetch = recordsWithSplitId;
  44.     return recordsWithSplitId;
  45. }
  1. MySqlRecords 返回单条数据集合。

  1. com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit
  2. public SourceRecord nextRecordFromSplit() {
  3.     final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
  4.     if (recordsForSplit != null) {
  5.         if (recordsForSplit.hasNext()) {
  6.             return recordsForSplit.next();
  7.         } else {
  8.             return null;
  9.         }
  10.     } else {
  11.         throw new IllegalStateException();
  12.     }
  13. }
  1. MySqlRecordEmitter 通过 RowDataDebeziumDeserializeSchema 将数据转换为Rowdata。

  1. com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
  2. public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
  3.     throws Exception {
  4. if (isWatermarkEvent(element)) {
  5.     BinlogOffset watermark = getWatermark(element);
  6.     if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
  7.         splitState.asSnapshotSplitState().setHighWatermark(watermark);
  8.     }
  9. else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
  10.     HistoryRecord historyRecord = getHistoryRecord(element);
  11.     Array tableChanges =
  12.             historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
  13.     TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
  14.     for (TableChanges.TableChange tableChange : changes) {
  15.         splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
  16.     }
  17. else if (isDataChangeRecord(element)) {
  18.     //  note: 数据的处理
  19.     if (splitState.isBinlogSplitState()) {
  20.         BinlogOffset position = getBinlogPosition(element);
  21.         splitState.asBinlogSplitState().setStartingOffset(position);
  22.     }
  23.     debeziumDeserializationSchema.deserialize(
  24.             element,
  25.             new Collector<T>() {
  26.                 @Override
  27.                 public void collect(final T t) {
  28.                     output.collect(t);
  29.                 }
  30.                 @Override
  31.                 public void close() {
  32.                     // do nothing
  33.                 }
  34.             });
  35. else {
  36.     // unknown element
  37.     LOG.info("Meet unknown element {}, just skip.", element);
  38. }
  39. }

RowDataDebeziumDeserializeSchema 序列化过程。

  1. com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
  2. public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
  3.     Envelope.Operation op = Envelope.operationFor(record);
  4.     Struct value = (Struct) record.value();
  5.     Schema valueSchema = record.valueSchema();
  6.     if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
  7.         GenericRowData insert = extractAfterRow(value, valueSchema);
  8.         validator.validate(insert, RowKind.INSERT);
  9.         insert.setRowKind(RowKind.INSERT);
  10.         out.collect(insert);
  11.     } else if (op == Envelope.Operation.DELETE) {
  12.         GenericRowData delete = extractBeforeRow(value, valueSchema);
  13.         validator.validate(delete, RowKind.DELETE);
  14.         delete.setRowKind(RowKind.DELETE);
  15.         out.collect(delete);
  16.     } else {
  17.         GenericRowData before = extractBeforeRow(value, valueSchema);
  18.         validator.validate(before, RowKind.UPDATE_BEFORE);
  19.         before.setRowKind(RowKind.UPDATE_BEFORE);
  20.         out.collect(before);
  21.         GenericRowData after = extractAfterRow(value, valueSchema);
  22.         validator.validate(after, RowKind.UPDATE_AFTER);
  23.         after.setRowKind(RowKind.UPDATE_AFTER);
  24.         out.collect(after);
  25.     }
  26. }

MySqlSourceReader 汇报切片读取完成事件

MySqlSourceReader处理完一个全量切片后,会向MySqlSourceEnumerator发送已完成的切片信息,包含切片ID、HighWatermar ,然后继续发送切片请求。

  1. com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
  2. protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
  3. for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
  4.     MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
  5.     finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
  6. }
  7. /**
  8.  *   note: 发送切片完成事件
  9.  */
  10. reportFinishedSnapshotSplitsIfNeed();
  11. //  上一个spilt处理完成后继续发送切片请求
  12. context.sendSplitRequest();
  13. }
  14. private void reportFinishedSnapshotSplitsIfNeed() {
  15.     if (!finishedUnackedSplits.isEmpty()) {
  16.         final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
  17.         for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
  18.             // note: 发送切片ID,及最大偏移量
  19.             finishedOffsets.put(split.splitId(), split.getHighWatermark());
  20.         }
  21.         FinishedSnapshotSplitsReportEvent reportEvent =
  22.                 new FinishedSnapshotSplitsReportEvent(finishedOffsets);
  23.         context.sendSourceEventToCoordinator(reportEvent);
  24.         LOG.debug(
  25.                 "The subtask {} reports offsets of finished snapshot splits {}.",
  26.                 subtaskId,
  27.                 finishedOffsets);
  28.     }
  29. }

MySqlSourceEnumerator 分配增量切片

全量阶段所有分片读取完毕后,MySqlHybridSplitAssigner 会创建BinlogSplit 进行后续增量读取,在创建BinlogSplit 会从全部已完成的全量切片中筛选最小BinlogOffset。注意:2.0.0分支 createBinlogSplit 最小偏移量总是从0开始,最新master分支已经修复这个BUG.

  1. private MySqlBinlogSplit createBinlogSplit() {
  2.     final List<MySqlSnapshotSplit> assignedSnapshotSplit =
  3.             snapshotSplitAssigner.getAssignedSplits().values().stream()
  4.                     .sorted(Comparator.comparing(MySqlSplit::splitId))
  5.                     .collect(Collectors.toList());
  6.     Map<String, BinlogOffset> splitFinishedOffsets =
  7.             snapshotSplitAssigner.getSplitFinishedOffsets();
  8.     final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
  9.     final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
  10.     BinlogOffset minBinlogOffset = null;
  11.     // note: 从所有assignedSnapshotSplit中筛选最小偏移量
  12.     for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
  13.         // find the min binlog offset
  14.         BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
  15.         if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) < 0) {
  16.             minBinlogOffset = binlogOffset;
  17.         }
  18.         finishedSnapshotSplitInfos.add(
  19.                 new FinishedSnapshotSplitInfo(
  20.                         split.getTableId(),
  21.                         split.splitId(),
  22.                         split.getSplitStart(),
  23.                         split.getSplitEnd(),
  24.                         binlogOffset));
  25.         tableSchemas.putAll(split.getTableSchemas());
  26.     }
  27.     final MySqlSnapshotSplit lastSnapshotSplit =
  28.             assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
  29.     
  30.     return new MySqlBinlogSplit(
  31.             BINLOG_SPLIT_ID,
  32.             lastSnapshotSplit.getSplitKeyType(),
  33.             minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
  34.             BinlogOffset.NO_STOPPING_OFFSET,
  35.             finishedSnapshotSplitInfos,
  36.             tableSchemas);
  37. }

5482541bcaebe50bad85ab3fd221c61e.png

八千里路云和月 | 从零到大数据专家学习路径指南

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

你好,我是王知无,一个大数据领域的硬核原创作者。

做过后端架构、数据中间件、数据平台&架构、算法工程化。

专注大数据领域实时动态&技术提升&个人成长&职场进阶,欢迎关注。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号