当前位置:   article > 正文

Debezium指定binlog位置初始化异常排查修复_error io.debezium.connector.mysql.mysqlstreamingch

error io.debezium.connector.mysql.mysqlstreamingchangeeventsource [] - error

异常现象

最近项目需要使用Debezium指定binlog位置读取数据,当配置FileDatabaseHistory 保存offset就会出现无法识别schema异常。

14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Encountered change event 'Event{header=EventHeaderV4{timestamp=1652710509000, eventType=TABLE_MAP, serverId=123454, headerLength=19, dataLength=30, nextPosition=5761, flags=0}, data=TableMapEventData{tableId=375, database='etl', table='test', columnTypes=3, 15, columnMetadata=0, 64, columnNullability={0, 1}, eventMetadata=null}}' at offset {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=123454, event=1} for table etl.test whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=5712 --stop-position=5761 --verbose mysql-bin.000001
14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Error during binlog processing. Last offset stored = {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=249, event=1}, binlog reader near position = mysql-bin.000001/5712
14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: Error processing binlog event
Caused by: io.debezium.DebeziumException: Encountered change event for table etl.test whose schema isn't known to this connector
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如何复现

我们配置引擎参数 database.history 为 FileDatabaseHistory 类来保存我们读取的binlog记录。通过FlinkOffsetBackingStore 类指定其自定义DebeziumOffset 配置偏移量。

props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
props.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
props.setProperty("snapshot.mode", "schema_only_recovery");
  • 1
  • 2
  • 3
  • 4

snapshot.mode 配置为全量+增量(initial),增量(never)模式都没有问题,但是指定位置 (schema_only_recovery)模式就会出现上述异常。

排查定位

线索

指定位置模式(schema_only_recovery)之前有参考flink cdc实现,通过FlinkOffsetBackingStore 类指定其自定义DebeziumOffset就可。而使用database.history 配置FlinkDatabaseHistory 代替FileDatabaseHistory时,指定可用的binlog位置就不会抛出异常。仔细对比两个类的实现也难以看出原因。

定位

  1. 因为知道FlinkDatabaseHistory 可以正常执行,所以用两个DatabaseHistory分别测试对比输出日志,发现FlinkDatabaseHistory多了一段重要的初始化步骤
    在这里插入图片描述

  2. 根据日志可以搜索MySqlSnapshotChangeEventSource 类,找到日志关键字输出部分
    “A previous offset indicating a completed”
    在这里插入图片描述

  3. 跟着日志执行,我们会发现在上一层,AbstractSnapshotChangeEventSource.execute调用getSnapshottingTask方法的下面。有一个判断Skipping snapshotting 逻辑。
    测试就会发现这个方法后面的逻辑就是读取快照初始化相关库表。而FileDatabaseHistory这里的snapshottingTask.shouldSkipSnapshot() 是true,直接跳过了。而FlinkDatabaseHistory是false,往下面继续执行。
    在这里插入图片描述

  4. 所以问题回到snapshottingTask.shouldSkipSnapshot()的逻辑上。snapshotSchema和snapshotData有一个为true,就要执行初始化。
    在这里插入图片描述

  5. 而这连个属性的赋值都是通过构造函数,通过测试发现上面两种的snapshotData都是false, 只有snapshotSchema不同。而这个构造函数又回到我们的步骤1中,两个类的databaseSchema.isStorageInitializationExecuted()不同
    在这里插入图片描述

  6. MySqlDatabaseSchema.isStorageInitializationExecuted() 就是返回storageInitialiationExecuted的值,而它只在initializeStorage() 一个方法中调用赋值。
    在这里插入图片描述

  7. 不过initializeStorage有多个地方调用。
    在这里插入图片描述

  8. 通过断点回溯方法栈,我们可以发现FlinkDatabaseHistory的调用是在MySqlConnectorTask.validateAndLoadDatabaseHistory 中。而FileDatabaseHistory没有调用则是因为schema.historyExists()返回为真。
    在这里插入图片描述

  9. 接着查看historyExists实现,就是我们DatabaseHistory中重写的方法了。
    在这里插入图片描述

  10. FileDatabaseHistory 的exists其实就是判断我们指定的文件是否存在。这里要说明下,我们在指定一个新文件时还是会抛出异常,是因为FileDatabaseHistory.start() 方法会先初始化生成对应的文件,所以后续调用就会返回true。
    在这里插入图片描述

  11. 而FlinkDatabaseHistory 则是返回Record列表,在没有事件读入前,自然是空的。
    在这里插入图片描述

修复
结合FlinkDatabaseHistory 和 FileDatabaseHistory重新实现一个DatabaseHistory,重点把exists 改为!this.schemaRecords.isEmpty();

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/836312
推荐阅读
相关标签
  

闽ICP备14008679号