赞
踩
最近项目需要使用Debezium指定binlog位置读取数据,当配置FileDatabaseHistory 保存offset就会出现无法识别schema异常。
14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Encountered change event 'Event{header=EventHeaderV4{timestamp=1652710509000, eventType=TABLE_MAP, serverId=123454, headerLength=19, dataLength=30, nextPosition=5761, flags=0}, data=TableMapEventData{tableId=375, database='etl', table='test', columnTypes=3, 15, columnMetadata=0, 64, columnNullability={0, 1}, eventMetadata=null}}' at offset {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=123454, event=1} for table etl.test whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.
Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position=5712 --stop-position=5761 --verbose mysql-bin.000001
14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Error during binlog processing. Last offset stored = {transaction_id=null, file=mysql-bin.000001, pos=5641, server_id=249, event=1}, binlog reader near position = mysql-bin.000001/5712
14:52:18.237 [blc-9.135.12.10:3307] ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: Error processing binlog event
Caused by: io.debezium.DebeziumException: Encountered change event for table etl.test whose schema isn't known to this connector
我们配置引擎参数 database.history
为 FileDatabaseHistory 类来保存我们读取的binlog记录。通过FlinkOffsetBackingStore 类指定其自定义DebeziumOffset 配置偏移量。
props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName());
props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);
props.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());
props.setProperty("snapshot.mode", "schema_only_recovery");
把snapshot.mode
配置为全量+增量(initial),增量(never)模式都没有问题,但是指定位置 (schema_only_recovery)模式就会出现上述异常。
指定位置模式(schema_only_recovery)之前有参考flink cdc实现,通过FlinkOffsetBackingStore 类指定其自定义DebeziumOffset就可。而使用database.history 配置FlinkDatabaseHistory 代替FileDatabaseHistory时,指定可用的binlog位置就不会抛出异常。仔细对比两个类的实现也难以看出原因。
因为知道FlinkDatabaseHistory 可以正常执行,所以用两个DatabaseHistory分别测试对比输出日志,发现FlinkDatabaseHistory多了一段重要的初始化步骤
根据日志可以搜索MySqlSnapshotChangeEventSource 类,找到日志关键字输出部分
“A previous offset indicating a completed”
跟着日志执行,我们会发现在上一层,AbstractSnapshotChangeEventSource.execute调用getSnapshottingTask方法的下面。有一个判断Skipping snapshotting 逻辑。
测试就会发现这个方法后面的逻辑就是读取快照初始化相关库表。而FileDatabaseHistory这里的snapshottingTask.shouldSkipSnapshot() 是true,直接跳过了。而FlinkDatabaseHistory是false,往下面继续执行。
所以问题回到snapshottingTask.shouldSkipSnapshot()的逻辑上。snapshotSchema和snapshotData有一个为true,就要执行初始化。
而这连个属性的赋值都是通过构造函数,通过测试发现上面两种的snapshotData都是false, 只有snapshotSchema不同。而这个构造函数又回到我们的步骤1中,两个类的databaseSchema.isStorageInitializationExecuted()不同
MySqlDatabaseSchema.isStorageInitializationExecuted() 就是返回storageInitialiationExecuted的值,而它只在initializeStorage() 一个方法中调用赋值。
不过initializeStorage有多个地方调用。
通过断点回溯方法栈,我们可以发现FlinkDatabaseHistory的调用是在MySqlConnectorTask.validateAndLoadDatabaseHistory 中。而FileDatabaseHistory没有调用则是因为schema.historyExists()返回为真。
接着查看historyExists实现,就是我们DatabaseHistory中重写的方法了。
FileDatabaseHistory 的exists其实就是判断我们指定的文件是否存在。这里要说明下,我们在指定一个新文件时还是会抛出异常,是因为FileDatabaseHistory.start() 方法会先初始化生成对应的文件,所以后续调用就会返回true。
而FlinkDatabaseHistory 则是返回Record列表,在没有事件读入前,自然是空的。
修复
结合FlinkDatabaseHistory 和 FileDatabaseHistory重新实现一个DatabaseHistory,重点把exists 改为!this.schemaRecords.isEmpty();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。