赞
踩
在通过FlinkCDC读取MySQL的BinLog日志的时候,发现读取到日期类型的数据和数据库中存储的相差八小时。flink版本:1.15.1,MySQLCDC版本:2.3.0如下图
自定义时间转换配置。代码如下
-
- package com.yzh;
-
- import java.time.Duration;
- import java.time.LocalDate;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.ZoneId;
- import java.time.ZonedDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.Properties;
-
- import io.debezium.spi.converter.CustomConverter;
- import io.debezium.spi.converter.RelationalColumn;
- import org.apache.kafka.connect.data.SchemaBuilder;
-
- /**
- * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
- * @author yzh
- *
- */
- public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
- private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
- private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
- private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
- private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
- private ZoneId timestampZoneId = ZoneId.systemDefault();
-
- @Override
- public void configure(Properties properties) {}
-
- @Override
- public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
- String sqlType = column.typeName().toUpperCase();
- SchemaBuilder schemaBuilder = null;
- Converter converter = null;
-
- if ("DATE".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
- converter = this::convertDate;
- }
-
- if ("TIME".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
- converter = this::convertTime;
- }
-
- if ("DATETIME".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
- converter = this::convertDateTime;
- }
-
- if ("TIMESTAMP".equals(sqlType)) {
- schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
- converter = this::convertTimestamp;
- }
-
- if (schemaBuilder != null) {
- registration.register(schemaBuilder, converter);
- }
- }
-
- private String convertDate(Object input) {
- if (input == null)
- return null;
-
- if (input instanceof LocalDate) {
- return dateFormatter.format((LocalDate) input);
- }
-
- if (input instanceof Integer) {
- LocalDate date = LocalDate.ofEpochDay((Integer) input);
- return dateFormatter.format(date);
- }
- return String.valueOf(input);
- }
-
- private String convertTime(Object input) {
- if (input == null)
- return null;
-
- if (input instanceof Duration) {
- Duration duration = (Duration) input;
- long seconds = duration.getSeconds();
- int nano = duration.getNano();
- LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
- return timeFormatter.format(time);
- }
- return String.valueOf(input);
- }
-
- private String convertDateTime(Object input) {
- if (input == null)
- return null;
-
- if (input instanceof LocalDateTime) {
- return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
- }
- return String.valueOf(input);
- }
-
- private String convertTimestamp(Object input) {
- if (input == null)
- return null;
- if (input instanceof ZonedDateTime) {
- // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
- ZonedDateTime zonedDateTime = (ZonedDateTime) input;
- LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
- return timestampFormatter.format(localDateTime).replaceAll("T", " ");
- }
- return String.valueOf(input);
- }
- }
--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown
- // 关键代码
- Properties debeziumProperties = new Properties();
- debeziumProperties.setProperty("converters", "dateConverters");
- debeziumProperties.setProperty("dateConverters.type", "com.yzh.MySqlDateTimeConverter"); // "com.yzh.MySqlDateTimeConverter" 更换成自己的路径
-
-
- MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
- .hostname("localhost")
- .port(3306)
- .username("root")
- .password("root")
- .databaseList("1226")
- .tableList("1226.users")
- .deserializer(new JsonDebeziumDeserializationSchema())
- .startupOptions(StartupOptions.initial())
- .serverTimeZone("Asia/Shanghai")
- .debeziumProperties(debeziumProperties) // 自定义debeziumProperties
- .build();
完美解决!!!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。