赞
踩
版本:2.2
结论:是个截至最新的 2.3 均一直未修复的 BUG
SHOW DATABASES -- 找到所有的库
-> SHOW FULL TABLES IN `$DBNAME` where Table_Type = 'BASE TABLE' -- 找到库中的所有的表
-> 过滤出需要处理的表
-> SHOW MASTER STATUS -- 找到 binlog 的位置
-> SHOW CREATE TABLE 不成功则 DESC -- 取得表的 schema
// taskmanager 日志
2023-03-06 18:59:54,158 INFO [617] [com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:140)] - Snapshot step 1 - Determining low watermark {ts_sec=0, file=mysql-bin.000061, pos=167453889, gtids=445780d1-7534-11ed-8e73-b8599fe5c702:1-4891196, row=0, event=0} for split MySqlSnapshotSplit{tableId=db_3.0.test_table, splitId='db_3.0.test_table:0', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
2023-03-06 18:59:54,161 INFO [617] [com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:149)] - Snapshot step 2 - Snapshotting data
2023-03-06 18:59:54,602 DEBUG [624] [com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:190)] - Snapshotting table db_3.0.test_table
// 没有接着出现“Snapshot step 3 - Determining high watermark”日志
// 也没有出现“Exporting data from split”日志
2023-03-06 18:59:54,162 ERROR [617] [com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$0(SnapshotSplitReader.java:148)] - Execute snapshot read task for mysql split MySqlSnapshotSplit{tableId=db_3.0.test_table, splitId='db_3.0.test_table:0', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} fail
io.debezium.DebeziumException: java.lang.NullPointerException
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120)
// jobmanager 日志 2023-03-06 16:38:15,242 INFO [47] [com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:88)] - MySQL validation passed. 2023-03-06 16:38:15,243 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:52)] - Read list of available databases 2023-03-06 16:38:15,276 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:62)] - list of available databases is: [information_schema, mysql, db_3.0, performance_schema, sync, sys, task] 2023-03-06 16:38:15,276 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.listTables(TableDiscoveryUtils.java:71)] - Read list of available tables in each database // 被 filtered 掉了 // 这里 tableId 值为:db_3.0.test_table 2023-03-06 16:38:15,528 INFO [47] [com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.lambda$listTables$1(TableDiscoveryUtils.java:83)] - 'db_3.0.test_table' is filtered out of capturing 2023-03-06 16:38:15,686 INFO [619] [io.debezium.jdbc.JdbcConnection.lambda$doClose$3(JdbcConnection.java:946)] - Connection gracefully closed 2023-03-06 16:38:15,689 ERROR [47] [org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:200)] - Failed to create Source Enumerator for source Source: cdc_test_table[1] org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) 2023-03-06 16:38:15,716 INFO [47] [org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1442)] - Source: cdc_test_table[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2] (2/2) (2cd1a9d5bba6ab1c6c98b15c77af2609) switched from CREATED to SCHEDULED. 2023-03-06 16:38:15,723 INFO [47] [org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:325)] - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_test_table[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [`db_3.0`] and table-name: [`db_3.0`.`test_table`] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:179) ... 34 more 2023-03-06 16:38:15,726 INFO [47] [org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1132)] - Job insert-into_default_catalog.default_database.hudi_test_table (000000000001378a0000000000000008) switched from state RUNNING to RESTARTING. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: cdc_test_table[1] -> ConstraintEnforcer[2] -> Sink: hudi_test_table[2]' (operator c783d1ea922c420880b56da4fffd1f2a).
// catalog:db_3.0, schema:null, table:test_table private static String tableId(String catalog, String schema, String table) { if (catalog != null && catalog.length() != 0) { return schema != null && schema.length() != 0 ? catalog + "." + schema + "." + table : catalog + "." + table; // catalog + table } else { return schema != null && schema.length() != 0 ? schema + "." + table : table; } } // flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java public static List<TableId> discoverCapturedTables( JdbcConnection jdbc, MySqlSourceConfig sourceConfig) { final List<TableId> capturedTableIds; try { capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters()); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to discover captured tables", e); } if (capturedTableIds.isEmpty()) { throw new IllegalArgumentException( String.format( "Can't find any matched tables, please check your configured database-name: %s and table-name: %s", sourceConfig.getDatabaseList(), sourceConfig.getTableList())); } return capturedTableIds; } // flink-connector-mysql-cdc\src\main\java\com\ververica\cdc\connectors\mysql\source\utils\TableDiscoveryUtils.java public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters) throws SQLException { final List<TableId> capturedTableIds = new ArrayList<>(); // ------------------- // READ DATABASE NAMES // ------------------- // Get the list of databases ... LOG.info("Read list of available databases"); final List<String> databaseNames = new ArrayList<>(); jdbc.query( "SHOW DATABASES", rs -> { while (rs.next()) { databaseNames.add(rs.getString(1)); } }); LOG.info("\t list of available databases is: {}", databaseNames); // ---------------- // READ TABLE NAMES // ---------------- // Get the list of table IDs for each database. We can't use a prepared statement with // MySQL, so we have to build the SQL statement each time. Although in other cases this // might lead to SQL injection, in our case we are reading the database names from the // database and not taking them from the user ... LOG.info("Read list of available tables in each database"); for (String dbName : databaseNames) { try { // quote 给 dbName 加上反引号 jdbc.query( "SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'", rs -> { while (rs.next()) { TableId tableId = new TableId(dbName, null, rs.getString(1)); if (tableFilters.dataCollectionFilter().isIncluded(tableId)) { capturedTableIds.add(tableId); LOG.info("\t including '{}' for further processing", tableId); } else { LOG.info("\t '{}' is filtered out of capturing", tableId); } } }); } catch (SQLException e) { // We were unable to execute the query or process the results, so skip this ... LOG.warn( "\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage()); } } return capturedTableIds; }
public void submitSplit(MySqlSplit mySqlSplit) { this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit(); statefulTaskContext.configure(currentSnapshotSplit); this.queue = statefulTaskContext.getQueue(); this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster(); this.hasNextElement.set(true); this.reachEnd.set(false); this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask( statefulTaskContext.getConnectorConfig(), statefulTaskContext.getOffsetContext(), statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), statefulTaskContext.getConnection(), statefulTaskContext.getDispatcher(), statefulTaskContext.getTopicSelector(), StatefulTaskContext.getClock(), currentSnapshotSplit); executor.submit( () -> { try { currentTaskRunning = true; // execute snapshot read task final SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl(); // private MySqlSnapshotSplitReadTask splitSnapshotReadTask; // com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120) SnapshotResult snapshotResult = splitSnapshotReadTask.execute(sourceContext); // 这里抛出的异常(io.debezium.DebeziumException: java.lang.NullPointerException) final MySqlBinlogSplit backfillBinlogSplit = createBackfillBinlogSplit(sourceContext); // optimization that skip the binlog read when the low watermark equals high // watermark final boolean binlogBackfillRequired = backfillBinlogSplit .getEndingOffset() .isAfter(backfillBinlogSplit.getStartingOffset()); if (!binlogBackfillRequired) { dispatchBinlogEndEvent(backfillBinlogSplit); currentTaskRunning = false; return; } // execute binlog read task if (snapshotResult.isCompletedOrSkipped()) { final MySqlBinlogSplitReadTask backfillBinlogReadTask = createBackfillBinlogReadTask(backfillBinlogSplit); backfillBinlogReadTask.execute( new SnapshotBinlogSplitChangeEventSourceContextImpl()); } else { readException = new IllegalStateException( String.format( "Read snapshot for mysql split %s fail", currentSnapshotSplit)); } } catch (Exception e) { currentTaskRunning = false; LOG.error( // 错误日志出自这里 String.format( "Execute snapshot read task for mysql split %s fail", currentSnapshotSplit), e); readException = e; } }); } @Override // flink-connector-mysql-cdc\src\main\java\com\ververica\cdc\connectors\mysql\debezium\task\MySqlSnapshotSplitReadTask.java:105 public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException { SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset); final SnapshotContext ctx; try { ctx = prepare(context); } catch (Exception e) { LOG.error("Failed to initialize snapshot context.", e); throw new RuntimeException(e); } try { return doExecute(context, ctx, snapshottingTask); // 异常来自这里,该函数会读取 binlog 的位置 } catch (InterruptedException e) { LOG.warn("Snapshot was interrupted before completion"); throw e; } catch (Exception t) { throw new DebeziumException(t); // 为此异常 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。