当前位置:   article > 正文

FLinkCDC读取MySQl中的日期问题_flinkcdc读取mysql date 类型

flinkcdc读取mysql date 类型

问题描述:

在通过FlinkCDC读取MySQL的BinLog日志的时候,发现读取到日期类型的数据和数据库中存储的相差八小时。flink版本:1.15.1,MySQLCDC版本:2.3.0如下图

解决办法:

自定义时间转换配置。代码如下

  1. package com.yzh;
  2. import java.time.Duration;
  3. import java.time.LocalDate;
  4. import java.time.LocalDateTime;
  5. import java.time.LocalTime;
  6. import java.time.ZoneId;
  7. import java.time.ZonedDateTime;
  8. import java.time.format.DateTimeFormatter;
  9. import java.util.Properties;
  10. import io.debezium.spi.converter.CustomConverter;
  11. import io.debezium.spi.converter.RelationalColumn;
  12. import org.apache.kafka.connect.data.SchemaBuilder;
  13. /**
  14. * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
  15. * @author yzh
  16. *
  17. */
  18. public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
  19. private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
  20. private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
  21. private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
  22. private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
  23. private ZoneId timestampZoneId = ZoneId.systemDefault();
  24. @Override
  25. public void configure(Properties properties) {}
  26. @Override
  27. public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
  28. String sqlType = column.typeName().toUpperCase();
  29. SchemaBuilder schemaBuilder = null;
  30. Converter converter = null;
  31. if ("DATE".equals(sqlType)) {
  32. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
  33. converter = this::convertDate;
  34. }
  35. if ("TIME".equals(sqlType)) {
  36. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
  37. converter = this::convertTime;
  38. }
  39. if ("DATETIME".equals(sqlType)) {
  40. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
  41. converter = this::convertDateTime;
  42. }
  43. if ("TIMESTAMP".equals(sqlType)) {
  44. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
  45. converter = this::convertTimestamp;
  46. }
  47. if (schemaBuilder != null) {
  48. registration.register(schemaBuilder, converter);
  49. }
  50. }
  51. private String convertDate(Object input) {
  52. if (input == null)
  53. return null;
  54. if (input instanceof LocalDate) {
  55. return dateFormatter.format((LocalDate) input);
  56. }
  57. if (input instanceof Integer) {
  58. LocalDate date = LocalDate.ofEpochDay((Integer) input);
  59. return dateFormatter.format(date);
  60. }
  61. return String.valueOf(input);
  62. }
  63. private String convertTime(Object input) {
  64. if (input == null)
  65. return null;
  66. if (input instanceof Duration) {
  67. Duration duration = (Duration) input;
  68. long seconds = duration.getSeconds();
  69. int nano = duration.getNano();
  70. LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
  71. return timeFormatter.format(time);
  72. }
  73. return String.valueOf(input);
  74. }
  75. private String convertDateTime(Object input) {
  76. if (input == null)
  77. return null;
  78. if (input instanceof LocalDateTime) {
  79. return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
  80. }
  81. return String.valueOf(input);
  82. }
  83. private String convertTimestamp(Object input) {
  84. if (input == null)
  85. return null;
  86. if (input instanceof ZonedDateTime) {
  87. // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
  88. ZonedDateTime zonedDateTime = (ZonedDateTime) input;
  89. LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
  90. return timestampFormatter.format(localDateTime).replaceAll("T", " ");
  91. }
  92. return String.valueOf(input);
  93. }
  94. }

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. // 关键代码
  2. Properties debeziumProperties = new Properties();
  3. debeziumProperties.setProperty("converters", "dateConverters");
  4. debeziumProperties.setProperty("dateConverters.type", "com.yzh.MySqlDateTimeConverter"); // "com.yzh.MySqlDateTimeConverter" 更换成自己的路径
  5. MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
  6. .hostname("localhost")
  7. .port(3306)
  8. .username("root")
  9. .password("root")
  10. .databaseList("1226")
  11. .tableList("1226.users")
  12. .deserializer(new JsonDebeziumDeserializationSchema())
  13. .startupOptions(StartupOptions.initial())
  14. .serverTimeZone("Asia/Shanghai")
  15. .debeziumProperties(debeziumProperties) // 自定义debeziumProperties
  16. .build();

结果:

完美解决!!!!!

参考:

实测解决 flink cdc mysql 时间字段差8小时/差13小时问题_普罗米修斯之火的博客-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/928049
推荐阅读
相关标签
  

闽ICP备14008679号