赞
踩
主要记录Flink CDC使用过程中的一些问题,不定时更新
运行环境:Flink 1.13.2 standalone、Flink CDC 2.0.2
问题描述:使用mysql cdc同步数据至Kafka,第一次提交任务可以正常运行,当使用flink stop命令停掉任务后,从savepoint启动任务,任务能够提交成功,但是无法启动,taskmanager日志有如下报错:
- 2021-11-03 11:04:28.060 [Source: Custom Source -> Sink: sink_Kafka (1/1)#914] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: sink_Kafka (1/1)#914 (d174f4c6f58d2ad3349a2583c28458a6) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 2
- Suppressed: java.lang.NullPointerException
- at com.ververica.cdc.debezium.DebeziumSourceFunction.cancel(DebeziumSourceFunction.java:492)
- at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:160)
- at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:210)
- at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:191)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:653)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
- at java.lang.Thread.run(Thread.java:748)
- Suppressed: java.lang.NullPointerException
- at com.ververica.cdc.debezium.DebeziumSourceFunction.cancel(DebeziumSourceFunction.java:492)
- at com.ververica.cdc.debezium.DebeziumSourceFunction.close(DebeziumSourceFunction.java:497)
- at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
- at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
- at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
- at java.lang.Thread.run(Thread.java:748)
关键报错:org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 2
解决办法:经过排查,该报错属于kafka-clients包,检查项目的pom文件,发现kafka-clients包的依赖为provided,而Flink的lib目录下没有kafka-clients包,由于Flink集群暂时无法重启,因此修改项目pom文件,注释掉provided使用默认依赖范围,重新打包并从savepoint启动任务,任务可以正常启动
运行环境:Flink 1.13.2 standalone、Flink CDC 2.1.0、DataStream API
问题描述:更新Flink CDC依赖至2.1.0后,发现提交jar包任务后会导致Jobmanager挂掉,Jobmanager日志中有如下报错:
- org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_5.
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
- at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
- at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
- at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
- at akka.actor.Actor.aroundReceive(Actor.scala:517)
- at akka.actor.Actor.aroundReceive$(Actor.scala:515)
- at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
- at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
- at akka.actor.ActorCell.invoke(ActorCell.scala:561)
- at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
- at akka.dispatch.Mailbox.run(Mailbox.scala:225)
- at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
- at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
- at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
- at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
- at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
- Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
- at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:385)
- at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
- ... 20 common frames omitted
- Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
- at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
- at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
- at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
- at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
- at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
- ... 22 common frames omitted
- Caused by: java.lang.NullPointerException: null
- at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
- at com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig.<init>(MySqlSourceConfig.java:90)
- at com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:287)
- at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:153)
- at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:124)
- at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291)
- at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
- at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
- at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
- ... 26 common frames omitted
解决方法:从日志中可以看出,在com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig.java中需要对某些参数进行不为空判断,查看源码,发现在Flink CDC 2.1.0中databaseList这个参数不能为空,在2.0.2之前这个参数可以不用指定;在代码中增加该参数后可以正常运行
运行环境:Flink 1.13.2、Flink CDC 2.1.0、DataStream API / Flink SQL
问题描述:该问题同 GitHub issue#628,具体现象为更新Flink CDC依赖至2.1.0后,提交任务发现会导致Jobmanger挂掉,Jobmanager日志中有如下报错:
- org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_5.
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
- at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
- at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
- at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
- at akka.actor.Actor.aroundReceive(Actor.scala:517)
- at akka.actor.Actor.aroundReceive$(Actor.scala:515)
- at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
- at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
- at akka.actor.ActorCell.invoke(ActorCell.scala:561)
- at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
- at akka.dispatch.Mailbox.run(Mailbox.scala:225)
- at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
- at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
- at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
- at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
- at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
- Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
- at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:385)
- at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
- ... 20 common frames omitted
- Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
- at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
- at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
- at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
- at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
- at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
- ... 22 common frames omitted
- Caused by: org.apache.flink.util.FlinkRuntimeException: java.lang.RuntimeException: Failed to get driver instance for jdbcUrl=jdbc:mysql://rm-t4n708r39tv4o57g7wo.mysql.singapore.rds.aliyuncs.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL
- at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:65)
- at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:68)
- at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:156)
- at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:124)
- at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291)
- at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
- at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
- at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
- ... 26 common frames omitted
- Caused by: java.lang.RuntimeException: Failed to get driver instance for jdbcUrl=jdbc:mysql://rm-t4n708r39tv4o57g7wo.mysql.singapore.rds.aliyuncs.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL
- at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:114)
- at com.zaxxer.hikari.pool.PoolBase.initializeDataSource(PoolBase.java:331)
- at com.zaxxer.hikari.pool.PoolBase.<init>(PoolBase.java:114)
- at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:108)
- at com.zaxxer.hikari.HikariDataSource.<init>(HikariDataSource.java:81)
- at com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource(PooledDataSourceFactory.java:54)
- at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools.getOrCreateConnectionPool(JdbcConnectionPools.java:51)
- at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:53)
- at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:872)
- at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:867)
- at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:413)
- at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:62)
- ... 33 common frames omitted
- Caused by: java.sql.SQLException: No suitable driver
- at java.sql.DriverManager.getDriver(DriverManager.java:315)
- at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
- ... 44 common frames omitted
解决方法:该问题已经修复,并合入2.1和master分支,拉取2.1分支并手动编译后,使用得到的flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar替换${FLINK_HOME}/lib下的同名jar包,重新提交任务后可以正常运行
flink-kafka-connector在1.15版本中已经不再使用FlinkKafkaProducer,取而代之是KafkaSink
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- .setBootstrapServers("xxx:9092")
- .setRecordSerializer(KafkaRecordSerializationSchema.builder()
- .setTopic("xxx")
- .setValueSerializationSchema(new SimpleStringSchema())
- .build())
- .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .setKafkaProducerConfig(producerProperties)
- .build();
运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API
问题描述:通过Flink CDC 2.4.0同步MySQL的表,当表中有decimal类型时,同步到Kafka中的decimal类型会变成字符串;断点排查后发现decimal类型的数据会按照BASE64解析,经过查阅文档,在Debezium的文档中找到了decimal.handling.mode参数,其默认为precise,可选参数还有double和string
解决方法:创建MysqlSource对象时,在debeziumProperties()方法中构建java.util.Properties对象,设置参数:decimal.handling.mode,值:string
运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API
问题描述:使用Flink CDC DataStream API创建MySQL source时,如果设置的serverTimeZone与数据库对应不上,会直接报错,具体信息如下:
- Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC-05:00. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
- at com.ververica.cdc.connectors.mysql.MySqlValidator.checkTimeZone(MySqlValidator.java:191) ~[?:?]
- at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:81) ~[?:?]
- at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:172) ~[?:?]
- at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:198) ~[flink-dist-1.15-vvr-6.0.5-SNAPSHOT.jar:1.15-vvr-6.0.5-SNAPSHOT]
- ... 44 more
解决方法:将serverTimeZone配置成与数据库相同的时区
运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API、MySQL时区东八区
问题描述:使用Flink CDC DataStream API从MySQL同步数据到Kafka,使用自带的JsonDebeziumDeserializationSchema作为反序列化类,发现datetime类型的字段被转成毫秒时间戳,但是多了8小时;更换西五区的数据库,时间戳少了5小时;该问题一直存在,Flink CDC一直没有修改
解决方法:如果不想自己实现反序列化类,则可以根据Debezium文档自定义datetime类型的转换器;如果想自己实现反序列化类,可以参考Flink CDC的FAQ中提到的RowDataDebeziumDeserializeSchema类;此处贴出自定义Debezium转换器的代码
- package com.hcloud.flink.tools;
-
- import io.debezium.spi.converter.CustomConverter;
- import io.debezium.spi.converter.RelationalColumn;
- import org.apache.kafka.connect.data.SchemaBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.sql.Timestamp;
- import java.time.*;
- import java.time.format.DateTimeFormatter;
- import java.util.Properties;
-
- /**
- * 自定义MySQL datetime类型转换器
- */
- public class CustomTimeConvert implements CustomConverter<SchemaBuilder, RelationalColumn> {
- private static final Logger log = LoggerFactory.getLogger(CustomTimeConvert.class);
-
- // 默认格式
- private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- // 默认时区,如果设置了serverTimeZone参数,则为serverTimeZone参数的值
- private ZoneId timezoneId = ZoneId.systemDefault();
-
- @Override
- public void configure(Properties properties) {
- String datetimeFormat = properties.getProperty("datetime.formatter");
-
- if (datetimeFormat != null && datetimeFormat.trim().length() > 0) {
- datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat.trim());
- } else {
- log.warn("datetime.formatter is null, use default formatter: yyyy-MM-dd HH:mm:ss");
- }
- }
-
- @Override
- public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
- // 获取字段类型
- String sqlType = relationalColumn.typeName().toUpperCase();
-
- SchemaBuilder schemaBuilder = null;
- Converter converter = null;
-
- // 对datetime类型进行自定义转换
- if ("DATETIME".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("debezium.mysql.datetime.string");
- converter = this::convertDatetime;
- }
-
- // 注册
- if (schemaBuilder != null) {
- converterRegistration.register(schemaBuilder, converter);
- }
- }
-
- // 格式化datetime,全量阶段以java.sql.Timestamp接收,增量阶段以Java.time.LocalDateTime接收
- private String convertDatetime(Object input) {
- if (input != null) {
- if (input instanceof Timestamp) {
- Timestamp timestamp = (Timestamp) input;
- LocalDateTime localDateTime = timestamp.toLocalDateTime();
- return datetimeFormatter.format(localDateTime);
- } else if (input instanceof LocalDateTime) {
- return datetimeFormatter.format((LocalDateTime) input);
- } else {
- log.warn("field is not java.sql.Timestamp or java.time.LocalDateTime, current field type is {}", input.getClass().toString());
- return null;
- }
- } else {
- return null;
- }
- }
- }
使用方法:在创建MySQL source的时候,加入如下配置
- // 创建MySQL source
- Properties debeziumProperties = new Properties();
- debeziumProperties.setProperty("decimal.handling.mode", "string");
- debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
- // 必要参数,配置自定义转换器的名称,该名称也是转换器其他参数的前缀
- debeziumProperties.setProperty("converters","myConverter");
- // 必要参数,配置转换器对应的类
- debeziumProperties.setProperty("myConverter.type","com.hcloud.flink.tools.CustomDateTimeConvert");
- // 可选参数(自定义参数),配置转换格式
- debeziumProperties.setProperty("myConverter.datetime.formatter","yyyy-MM-dd HH:mm:ss");
-
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("ip")
- .port(3306)
- .databaseList("db")
- .tableList("db.table")
- .username("user")
- .password("password")
- .serverTimeZone("UTC+8")
- .startupOptions(StartupOptions.latest())
- .deserializer(new JsonDebeziumDeserializationSchema())
- .debeziumProperties(debeziumProperties)
- .build();
由于公司目前没有使用除datetime类型外的其他时间类型,因此没有实现其他时间类型的转换,网上也有很多其他全面的自定义Debezium转换器可以参考,此处贴一个Github大佬的代码
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。