当前位置:   article > 正文

Flink CDC相关记录_the mysql server has a timezone offset (28800 seco

the mysql server has a timezone offset (28800 seconds ahead of utc) which do

主要记录Flink CDC使用过程中的一些问题,不定时更新

问题记录

1.无法从savepoint启动任务

运行环境:Flink 1.13.2 standalone、Flink CDC 2.0.2

问题描述:使用mysql cdc同步数据至Kafka,第一次提交任务可以正常运行,当使用flink stop命令停掉任务后,从savepoint启动任务,任务能够提交成功,但是无法启动,taskmanager日志有如下报错:

  1. 2021-11-03 11:04:28.060 [Source: Custom Source -> Sink: sink_Kafka (1/1)#914] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: sink_Kafka (1/1)#914 (d174f4c6f58d2ad3349a2583c28458a6) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 2
  2. Suppressed: java.lang.NullPointerException
  3. at com.ververica.cdc.debezium.DebeziumSourceFunction.cancel(DebeziumSourceFunction.java:492)
  4. at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:160)
  5. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:210)
  6. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:191)
  7. at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:653)
  8. at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
  9. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
  10. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  11. at java.lang.Thread.run(Thread.java:748)
  12. Suppressed: java.lang.NullPointerException
  13. at com.ververica.cdc.debezium.DebeziumSourceFunction.cancel(DebeziumSourceFunction.java:492)
  14. at com.ververica.cdc.debezium.DebeziumSourceFunction.close(DebeziumSourceFunction.java:497)
  15. at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
  16. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
  17. at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
  18. at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
  19. at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
  20. at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
  21. at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
  22. at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
  23. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
  24. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  25. at java.lang.Thread.run(Thread.java:748)

关键报错:org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 2

解决办法:经过排查,该报错属于kafka-clients包,检查项目的pom文件,发现kafka-clients包的依赖为provided,而Flink的lib目录下没有kafka-clients包,由于Flink集群暂时无法重启,因此修改项目pom文件,注释掉provided使用默认依赖范围,重新打包并从savepoint启动任务,任务可以正常启动

2.升级Flink CDC版本后无法提交任务一

运行环境:Flink 1.13.2 standalone、Flink CDC 2.1.0、DataStream API

问题描述:更新Flink CDC依赖至2.1.0后,发现提交jar包任务后会导致Jobmanager挂掉,Jobmanager日志中有如下报错:

  1. org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_5.
  2. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
  3. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
  4. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  5. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  6. at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
  7. at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
  8. at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  9. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  10. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
  11. at akka.actor.Actor.aroundReceive(Actor.scala:517)
  12. at akka.actor.Actor.aroundReceive$(Actor.scala:515)
  13. at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  14. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  15. at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  16. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  17. at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  18. at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  19. at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  20. at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  21. at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  22. at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  23. Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
  24. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:385)
  25. at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
  26. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
  27. ... 20 common frames omitted
  28. Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
  29. at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
  30. at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
  31. at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
  32. at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
  33. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
  34. ... 22 common frames omitted
  35. Caused by: java.lang.NullPointerException: null
  36. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
  37. at com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig.<init>(MySqlSourceConfig.java:90)
  38. at com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory.createConfig(MySqlSourceConfigFactory.java:287)
  39. at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:153)
  40. at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:124)
  41. at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291)
  42. at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
  43. at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
  44. at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
  45. ... 26 common frames omitted

解决方法:从日志中可以看出,在com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig.java中需要对某些参数进行不为空判断,查看源码,发现在Flink CDC 2.1.0中databaseList这个参数不能为空,在2.0.2之前这个参数可以不用指定;在代码中增加该参数后可以正常运行

3.升级Flink CDC版本后无法提交任务二

运行环境:Flink 1.13.2、Flink CDC 2.1.0、DataStream API / Flink SQL

问题描述:该问题同 GitHub issue#628,具体现象为更新Flink CDC依赖至2.1.0后,提交任务发现会导致Jobmanger挂掉,Jobmanager日志中有如下报错:

  1. org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_5.
  2. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
  3. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
  4. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  5. at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  6. at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
  7. at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
  8. at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  9. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  10. at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
  11. at akka.actor.Actor.aroundReceive(Actor.scala:517)
  12. at akka.actor.Actor.aroundReceive$(Actor.scala:515)
  13. at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  14. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  15. at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  16. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  17. at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  18. at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  19. at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  20. at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  21. at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  22. at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  23. Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
  24. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:385)
  25. at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
  26. at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:605)
  27. ... 20 common frames omitted
  28. Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
  29. at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
  30. at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:592)
  31. at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:955)
  32. at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:873)
  33. at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:383)
  34. ... 22 common frames omitted
  35. Caused by: org.apache.flink.util.FlinkRuntimeException: java.lang.RuntimeException: Failed to get driver instance for jdbcUrl=jdbc:mysql://rm-t4n708r39tv4o57g7wo.mysql.singapore.rds.aliyuncs.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL
  36. at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:65)
  37. at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:68)
  38. at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:156)
  39. at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:124)
  40. at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:291)
  41. at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70)
  42. at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:194)
  43. at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:85)
  44. ... 26 common frames omitted
  45. Caused by: java.lang.RuntimeException: Failed to get driver instance for jdbcUrl=jdbc:mysql://rm-t4n708r39tv4o57g7wo.mysql.singapore.rds.aliyuncs.com:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL
  46. at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:114)
  47. at com.zaxxer.hikari.pool.PoolBase.initializeDataSource(PoolBase.java:331)
  48. at com.zaxxer.hikari.pool.PoolBase.<init>(PoolBase.java:114)
  49. at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:108)
  50. at com.zaxxer.hikari.HikariDataSource.<init>(HikariDataSource.java:81)
  51. at com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource(PooledDataSourceFactory.java:54)
  52. at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools.getOrCreateConnectionPool(JdbcConnectionPools.java:51)
  53. at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:53)
  54. at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:872)
  55. at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:867)
  56. at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:413)
  57. at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:62)
  58. ... 33 common frames omitted
  59. Caused by: java.sql.SQLException: No suitable driver
  60. at java.sql.DriverManager.getDriver(DriverManager.java:315)
  61. at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
  62. ... 44 common frames omitted

解决方法:该问题已经修复,并合入2.1和master分支,拉取2.1分支并手动编译后,使用得到的flink-sql-connector-mysql-cdc-2.1-SNAPSHOT.jar替换${FLINK_HOME}/lib下的同名jar包,重新提交任务后可以正常运行

4.Flink 1.13升级Flink 1.15

flink-kafka-connector在1.15版本中已经不再使用FlinkKafkaProducer,取而代之是KafkaSink

  1. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  2. .setBootstrapServers("xxx:9092")
  3. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  4. .setTopic("xxx")
  5. .setValueSerializationSchema(new SimpleStringSchema())
  6. .build())
  7. .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  8. .setKafkaProducerConfig(producerProperties)
  9. .build();

5.decimal类型变成字符串

运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API

问题描述:通过Flink CDC 2.4.0同步MySQL的表,当表中有decimal类型时,同步到Kafka中的decimal类型会变成字符串;断点排查后发现decimal类型的数据会按照BASE64解析,经过查阅文档,在Debezium的文档中找到了decimal.handling.mode参数,其默认为precise,可选参数还有double和string

解决方法:创建MysqlSource对象时,在debeziumProperties()方法中构建java.util.Properties对象,设置参数:decimal.handling.mode,值:string

6.Flink CDC时区问题

运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API

问题描述:使用Flink CDC DataStream API创建MySQL source时,如果设置的serverTimeZone与数据库对应不上,会直接报错,具体信息如下:

  1. Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC-05:00. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
  2. at com.ververica.cdc.connectors.mysql.MySqlValidator.checkTimeZone(MySqlValidator.java:191) ~[?:?]
  3. at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:81) ~[?:?]
  4. at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:172) ~[?:?]
  5. at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:198) ~[flink-dist-1.15-vvr-6.0.5-SNAPSHOT.jar:1.15-vvr-6.0.5-SNAPSHOT]
  6. ... 44 more

解决方法:将serverTimeZone配置成与数据库相同的时区

7.datetime字段会多8小时

运行环境:Flink 1.15.4、Flink CDC 2.4.0、DataStream API、MySQL时区东八区

问题描述:使用Flink CDC DataStream API从MySQL同步数据到Kafka,使用自带的JsonDebeziumDeserializationSchema作为反序列化类,发现datetime类型的字段被转成毫秒时间戳,但是多了8小时;更换西五区的数据库,时间戳少了5小时;该问题一直存在,Flink CDC一直没有修改

解决方法:如果不想自己实现反序列化类,则可以根据Debezium文档自定义datetime类型的转换器;如果想自己实现反序列化类,可以参考Flink CDC的FAQ中提到的RowDataDebeziumDeserializeSchema类;此处贴出自定义Debezium转换器的代码

  1. package com.hcloud.flink.tools;
  2. import io.debezium.spi.converter.CustomConverter;
  3. import io.debezium.spi.converter.RelationalColumn;
  4. import org.apache.kafka.connect.data.SchemaBuilder;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.sql.Timestamp;
  8. import java.time.*;
  9. import java.time.format.DateTimeFormatter;
  10. import java.util.Properties;
  11. /**
  12. * 自定义MySQL datetime类型转换器
  13. */
  14. public class CustomTimeConvert implements CustomConverter<SchemaBuilder, RelationalColumn> {
  15. private static final Logger log = LoggerFactory.getLogger(CustomTimeConvert.class);
  16. // 默认格式
  17. private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  18. // 默认时区,如果设置了serverTimeZone参数,则为serverTimeZone参数的值
  19. private ZoneId timezoneId = ZoneId.systemDefault();
  20. @Override
  21. public void configure(Properties properties) {
  22. String datetimeFormat = properties.getProperty("datetime.formatter");
  23. if (datetimeFormat != null && datetimeFormat.trim().length() > 0) {
  24. datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat.trim());
  25. } else {
  26. log.warn("datetime.formatter is null, use default formatter: yyyy-MM-dd HH:mm:ss");
  27. }
  28. }
  29. @Override
  30. public void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {
  31. // 获取字段类型
  32. String sqlType = relationalColumn.typeName().toUpperCase();
  33. SchemaBuilder schemaBuilder = null;
  34. Converter converter = null;
  35. // 对datetime类型进行自定义转换
  36. if ("DATETIME".equals(sqlType)) {
  37. schemaBuilder = SchemaBuilder.string().optional().name("debezium.mysql.datetime.string");
  38. converter = this::convertDatetime;
  39. }
  40. // 注册
  41. if (schemaBuilder != null) {
  42. converterRegistration.register(schemaBuilder, converter);
  43. }
  44. }
  45. // 格式化datetime,全量阶段以java.sql.Timestamp接收,增量阶段以Java.time.LocalDateTime接收
  46. private String convertDatetime(Object input) {
  47. if (input != null) {
  48. if (input instanceof Timestamp) {
  49. Timestamp timestamp = (Timestamp) input;
  50. LocalDateTime localDateTime = timestamp.toLocalDateTime();
  51. return datetimeFormatter.format(localDateTime);
  52. } else if (input instanceof LocalDateTime) {
  53. return datetimeFormatter.format((LocalDateTime) input);
  54. } else {
  55. log.warn("field is not java.sql.Timestamp or java.time.LocalDateTime, current field type is {}", input.getClass().toString());
  56. return null;
  57. }
  58. } else {
  59. return null;
  60. }
  61. }
  62. }

使用方法:在创建MySQL source的时候,加入如下配置

  1. // 创建MySQL source
  2. Properties debeziumProperties = new Properties();
  3. debeziumProperties.setProperty("decimal.handling.mode", "string");
  4. debeziumProperties.setProperty("bigint.unsigned.handling.mode", "long");
  5. // 必要参数,配置自定义转换器的名称,该名称也是转换器其他参数的前缀
  6. debeziumProperties.setProperty("converters","myConverter");
  7. // 必要参数,配置转换器对应的类
  8. debeziumProperties.setProperty("myConverter.type","com.hcloud.flink.tools.CustomDateTimeConvert");
  9. // 可选参数(自定义参数),配置转换格式
  10. debeziumProperties.setProperty("myConverter.datetime.formatter","yyyy-MM-dd HH:mm:ss");
  11. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  12. .hostname("ip")
  13. .port(3306)
  14. .databaseList("db")
  15. .tableList("db.table")
  16. .username("user")
  17. .password("password")
  18. .serverTimeZone("UTC+8")
  19. .startupOptions(StartupOptions.latest())
  20. .deserializer(new JsonDebeziumDeserializationSchema())
  21. .debeziumProperties(debeziumProperties)
  22. .build();

由于公司目前没有使用除datetime类型外的其他时间类型,因此没有实现其他时间类型的转换,网上也有很多其他全面的自定义Debezium转换器可以参考,此处贴一个Github大佬的代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/795343
推荐阅读
相关标签
  

闽ICP备14008679号