赞
踩
mysql同步表结构
mysql中的timestamp字段是可以正常同步的,但是多了8小时,设置了mysql链接属性也没效果
CREATE TABLE `temp_flink` (
`id` int(11) NOT NULL,
`name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`remark` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
参考下方的链接有两种方式;
import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Properties; /** * mysql日期字段时区/格式处理 * @author JGMa */ public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input == null) { return null; } if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return String.valueOf(input); } private String convertTime(Object input) { if (input == null) { return null; } if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return String.valueOf(input); } private String convertDateTime(Object input) { if (input == null) { return null; } if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " "); } return String.valueOf(input); } private String convertTimestamp(Object input) { if (input == null) { return null; } if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime).replaceAll("T", " "); } return String.valueOf(input); } }
{ public static void main(String[] args) { String tableName = "temp_flink"; String srcHost = "192.168.10.14"; String srcDatabase = "xcode"; String srcUsername = "root"; String srcPassword = "123456"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); Properties mysqlProperties = new Properties(); // mysqlProperties.setProperty("characterEncoding","UTF-8"); // mysqlProperties.setProperty("connectionTimeZone","Asia/Shanghai"); //自定义时间转换配置 mysqlProperties.setProperty("converters", "dateConverters"); mysqlProperties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter"); MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(srcHost) // .jdbcProperties(mysqlProperties) .port(3306) .databaseList(srcDatabase) .tableList(srcDatabase + "." + tableName) .username(srcUsername) .password(srcPassword) // .serverTimeZone("Asia/Shanghai") // 主要是这里 .debeziumProperties(mysqlProperties) .deserializer(new JsonStringDebeziumDeserializationSchema()) .build(); DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[temp_flink-source]") .setParallelism(1); streamSource.addSink(StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8") .withProperty("load-url", "192.168.10.245:8030") .withProperty("database-name", "xcode") .withProperty("username", "root") .withProperty("password", "123456") .withProperty("table-name", tableName) // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。 // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // 设置并行度,多并行度情况下需要考虑如何保证数据有序性 .withProperty("sink.parallelism", "1") .build()) ).name(">>>StarRocks temp_flink Sink<<<"); try { env.execute("temp_flink stream sync"); } catch (Exception e) { e.printStackTrace(); log.error("[sync error] info : {}", e); } } }
参考资料
https://blog.csdn.net/cloudbigdata/article/details/122935333
https://blog.csdn.net/WuBoooo/article/details/127387144
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。