赞
踩
项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql -> greenplumn场景记录实现方案。
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.4.1</version>
- </dependency>
-
- <!-- mysql-->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.29</version>
- </dependency>
- <dependency>
- <groupId>org.postgresql</groupId>
- <artifactId>postgresql</artifactId>
- <version>42.5.0</version>
- </dependency>
创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的起始时间点,读取并缓存数据源的元数据信息
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Schema;
- import org.apache.flink.table.catalog.DefaultCatalogTable;
- import org.apache.flink.table.catalog.ObjectPath;
- import org.apache.flink.table.catalog.exceptions.TableNotExistException;
- import org.apache.flink.table.types.DataType;
- import org.apache.flink.table.types.logical.LogicalType;
- import org.apache.flink.table.types.logical.RowType;
- import org.apache.flink.types.Row;
-
- import java.util.*;
- import java.util.stream.Collectors;
-
- /**
- * 目标表是否自动创建
- * 表结构变更
- */
- @Slf4j
- public class FlinkCdcMultiSyncJdbc {
-
- public static CdcPro cdcPro;
-
- public static MySqlCatalog mysqlCatalog;
-
- public static List<String> tables;
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- init();
-
- // 监控mysql binlog
- //创建 Flink-MySQL-CDC 的 Source
- //initial (default): 在第一次启动时对被监视的数据库表执行初始快照,并继续读取最新的binlog (开启断点续传后从上次消费offset继续消费)
- //latest-offset: 永远不要在第一次启动时对被监视的数据库表执行快照,只从binlog的末尾读取,这意味着只有自连接器启动以来的更改
- //timestamp: 永远不要在第一次启动时对监视的数据库表执行快照,直接从指定的时间戳读取binlog。使用者将从头遍历binlog,并忽略时间戳小于指定时间戳的更改事件
- //specific-offset: 不允许在第一次启动时对监视的数据库表进行快照,直接从指定的偏移量读取binlog。
- Properties jdbcProperties = new Properties();
- jdbcProperties.setProperty("time_zone", "+8:00");
- jdbcProperties.setProperty("serverTimeZone", "Asia/Shanghai");
- MySqlSource<Tuple2<String, Row>> mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()
- .hostname(cdcPro.getHost())
- .port(cdcPro.getPort())
- .databaseList(cdcPro.getDb())
- .tableList(tables.toArray(new String[0]))
- // .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
- .username(cdcPro.getUserName())
- .password(cdcPro.getPassWord())
- .jdbcProperties(jdbcProperties)
- .deserializer(new MySQLDebeziumDeserializer(buildTableRowTypeMap(cdcPro, tables)))
- .startupOptions(StartupOptions.latest())
- .includeSchemaChanges(false)
- .build();
-
- SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource,
- WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
- dataStreamSource.addSink(new DwSink(cdcPro));
- env.execute("flinkcdc_mysql_dw");
- }
-
- private static void init() throws Exception {
- cdcPro = SourceUtil.createCdcPro();
- String source_url = String.format("jdbc:mysql://%s:%d", cdcPro.getHost(), cdcPro.getPort());
- // 注册同步的库对应的catalog
- mysqlCatalog = new MySqlCatalog(new ClassLoader() {
- @Override
- public Class<?> loadClass(String name) throws ClassNotFoundException {
- return super.loadClass(name);
- }
- }, "mysql-catalog", cdcPro.getDb(), cdcPro.getUserName(), cdcPro.getPassWord(), source_url);
- // List<String> dbs = mysqlCatalog.listDatabases();
- tables = buildTables(cdcPro);
- System.setProperty("HADOOP_USER_NAME", "root");
- }
-
- private static List<String> buildTables(CdcPro cdcPro) throws Exception {
- // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
- List<String> tables = new ArrayList<>();
- if (".*".equals(cdcPro.getTableList())) {
- tables = mysqlCatalog.listTables(cdcPro.getDb());
- } else {
- String[] tableArray = cdcPro.getTableList().split(",");
- for (String table : tableArray) {
- tables.add(table);
- }
- }
-
- //过滤提出列表中的表清单
- tables = tables.stream().filter(tableName->!cdcPro.getExcludeTables().contains(tableName)).map(t->cdcPro.getDb() + "." + t).collect(Collectors.toList());
- log.info("cdc tables: \n" + tables);
- return tables;
- }
-
- private static Map<String, RowType> buildTableRowTypeMap(CdcPro cdcPro, List<String> tables) throws Exception {
- Map<String, RowType> tableRowTypeMap = new HashMap<>();
- // Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();
- tables.parallelStream().forEach(table ->{
- String tableName = StringUtils.replace(table, cdcPro.getDb() + ".", "");
- // 获取mysql catalog中注册的表
- ObjectPath objectPath = new ObjectPath(cdcPro.getDb(), tableName);
- DefaultCatalogTable catalogBaseTable = null;
- try {
- catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
- } catch (TableNotExistException e) {
- throw new RuntimeException(e);
- }
- // 获取表的Schema
- Schema schema = catalogBaseTable.getUnresolvedSchema();
- // 获取表中字段名列表和表字段类型
- String[] fieldNames = new String[schema.getColumns().size()];
- LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
- for (int i = 0; i < schema.getColumns().size(); i++) {
- Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
- fieldNames[i] = column.getName();
- logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
-
- }
- tableRowTypeMap.put(tableName, RowType.of(logicalTypes, fieldNames));
- });
- return tableRowTypeMap;
- }
-
- }
创建自定义数据解析器,根据FlinkCDC捕获的日志内容,解析出变更的表、数据、操作类型。
自定义数据转换器,将日志解析数据转换成为Flink数据类型
设置Flink数据的Changelog,表和数据组成二元组,放入Flink收集器中。
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
- import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
- import com.ververica.cdc.debezium.utils.TemporalConversions;
- import io.debezium.data.Envelope;
- import io.debezium.data.SpecialValueDecimal;
- import io.debezium.data.VariableScaleDecimal;
- import io.debezium.time.*;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.typeinfo.TypeHint;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
- import org.apache.flink.table.catalog.DefaultCatalogTable;
- import org.apache.flink.table.catalog.ObjectPath;
- import org.apache.flink.table.data.DecimalData;
- import org.apache.flink.table.data.StringData;
- import org.apache.flink.table.data.TimestampData;
- import org.apache.flink.table.types.DataType;
- import org.apache.flink.table.types.logical.DecimalType;
- import org.apache.flink.table.types.logical.LogicalType;
- import org.apache.flink.table.types.logical.RowType;
- import org.apache.flink.types.Row;
- import org.apache.flink.types.RowKind;
- import org.apache.flink.types.RowUtils;
- import org.apache.flink.util.Collector;
-
- import java.math.BigDecimal;
- import java.nio.ByteBuffer;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.ZoneId;
- import java.util.LinkedHashMap;
- import java.util.Map;
- import java.util.Properties;
-
- @Slf4j
- public class MySQLDebeziumDeserializer implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
-
- private final Map<String, RowType> tableRowTypeMap;
- private final Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();
-
- MySQLDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
- this.tableRowTypeMap = tableRowTypeMap;
- for (String tableName : this.tableRowTypeMap.keySet()) {
- RowType rowType = this.tableRowTypeMap.get(tableName);
- DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
- this.physicalConverterMap.put(tableName, physicalConverter);
- }
- }
-
- @Override
- public void deserialize(SourceRecord record, Collector out) throws Exception {
- Struct value = (Struct) record.value();
- // log.info("value:{}", value);
- String tableName = value.getStruct("source").get("table").toString();
-
- //如果该表不在初始化的列表中,过滤该表的操作
- if(!physicalConverterMap.containsKey(tableName)){
- return;
- }
-
- DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
- Envelope.Operation op = Envelope.operationFor(record);
-
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- Row insert = extractAfterRow(record, physicalConverter);
- insert.setKind(RowKind.INSERT);
- out.collect(Tuple2.of(tableName, insert));
- } else if (op == Envelope.Operation.DELETE) {
- Row delete = extractBeforeRow(record, physicalConverter);
- delete.setKind(RowKind.DELETE);
- out.collect(Tuple2.of(tableName, delete));
- } else if (op == Envelope.Operation.UPDATE) {
- Row before = extractBeforeRow(record, physicalConverter);
- before.setKind(RowKind.UPDATE_BEFORE);
- out.collect(Tuple2.of(tableName, before));
-
- Row after = extractAfterRow(record, physicalConverter);
- after.setKind(RowKind.UPDATE_AFTER);
- out.collect(Tuple2.of(tableName, after));
- }
-
- }
-
- private Row extractAfterRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
- Schema afterSchema = record.valueSchema().field(Envelope.FieldName.AFTER).schema();
- Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
- return (Row) physicalConverter.convert(after, afterSchema);
- }
-
- private Row extractBeforeRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
- Schema beforeSchema = record.valueSchema().field(Envelope.FieldName.BEFORE).schema();
- Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
- return (Row) physicalConverter.convert(before, beforeSchema);
- }
-
- @Override
- public TypeInformation<Tuple2<String, Row>> getProducedType() {
- return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {
- });
- }
-
- public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
- /**
- * datetime TIMESTAMP_WITHOUT_TIME_ZONE
- * date DATE
- * time TIME_WITHOUT_TIME_ZONE
- * timestamp TIMESTAMP_WITHOUT_TIME_ZONE
- */
- switch (type.getTypeRoot()) {
- case NULL:
- return convertToNull();
- case BOOLEAN:
- return convertToInt();
- case TINYINT:
- return convertToInt();
- case SMALLINT:
- return convertToInt();
- case INTEGER:
- case INTERVAL_YEAR_MONTH:
- return convertToInt();
- case BIGINT:
- case INTERVAL_DAY_TIME:
- return convertToLong();
- case DATE:
- return convertToDate();
- case TIME_WITHOUT_TIME_ZONE:
- return convertToTime();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return convertToTimestamp(ZoneId.of("UTC+8"));
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
- case FLOAT:
- return convertToFloat();
- case DOUBLE:
- return convertToDouble();
- case CHAR:
- case VARCHAR:
- return convertToString();
- case BINARY:
- case VARBINARY:
- return convertToBinary();
- case DECIMAL:
- return createDecimalConverter((DecimalType) type);
- case ROW:
- return createRowConverter((RowType) type);
- case ARRAY:
- case MAP:
- case MULTISET:
- case RAW:
- default:
- throw new UnsupportedOperationException("Unsupported type: " + type);
- }
- }
-
- private static DeserializationRuntimeConverter convertToNull() {
- return new DeserializationRuntimeConverter() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- return null;
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToByte() {
- return new DeserializationRuntimeConverter() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- return Byte.parseByte(dbzObj.toString());
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToBoolean() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Boolean) {
- return dbzObj;
- } else if (dbzObj instanceof Byte) {
- return (byte) dbzObj == 1;
- } else if (dbzObj instanceof Short) {
- return (short) dbzObj == 1;
- } else {
- return Boolean.parseBoolean(dbzObj.toString());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToShort() {
- return new DeserializationRuntimeConverter() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- return Short.parseShort(dbzObj.toString());
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToInt() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Integer) {
- return dbzObj;
- } else if (dbzObj instanceof Long) {
- return ((Long) dbzObj).intValue();
- } else {
- return Integer.parseInt(dbzObj.toString());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToLong() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Integer) {
- return ((Integer) dbzObj).longValue();
- } else if (dbzObj instanceof Long) {
- return dbzObj;
- } else {
- return Long.parseLong(dbzObj.toString());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- if(precision > 38){
- precision = 38;
- }
-
- if(scale > 10){
- scale = 10;
- }
-
- int finalPrecision = precision;
- int finalScale = scale;
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- BigDecimal bigDecimal;
- if (dbzObj instanceof byte[]) {
- // decimal.handling.mode=precise
- bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
- } else if (dbzObj instanceof String) {
- // decimal.handling.mode=string
- bigDecimal = new BigDecimal((String) dbzObj);
- } else if (dbzObj instanceof Double) {
- // decimal.handling.mode=double
- bigDecimal = BigDecimal.valueOf((Double) dbzObj);
- } else {
- if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
- SpecialValueDecimal decimal =
- VariableScaleDecimal.toLogical((Struct) dbzObj);
- bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
- } else {
- // fallback to string
- bigDecimal = new BigDecimal(dbzObj.toString());
- }
- }
- return DecimalData.fromBigDecimal(bigDecimal, finalPrecision, finalScale);
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToDouble() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Float) {
- return ((Float) dbzObj).doubleValue();
- } else if (dbzObj instanceof Double) {
- return dbzObj;
- } else {
- return Double.parseDouble(dbzObj.toString());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToFloat() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Float) {
- return dbzObj;
- } else if (dbzObj instanceof Double) {
- return ((Double) dbzObj).floatValue();
- } else {
- return Float.parseFloat(dbzObj.toString());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToDate() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- return TemporalConversions.toLocalDate(dbzObj);
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToTime() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Long) {
- int seconds = 0;
- switch (schema.name()) {
- case MicroTime.SCHEMA_NAME:
- seconds = (int) ((long) dbzObj / 1000);
- case NanoTime.SCHEMA_NAME:
- seconds = (int) ((long) dbzObj / 1000_000);
- }
- return LocalTime.ofSecondOfDay(seconds);
- } else if (dbzObj instanceof Integer) {
- return LocalTime.ofSecondOfDay((Integer) dbzObj);
- }
- // get number of milliseconds of the day
- return TemporalConversions.toLocalTime(dbzObj);
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof Long) {
- switch (schema.name()) {
- case Timestamp.SCHEMA_NAME:
- return TimestampData.fromEpochMillis((Long) dbzObj);
- case MicroTimestamp.SCHEMA_NAME:
- long micro = (long) dbzObj;
- return TimestampData.fromEpochMillis(
- micro / 1000, (int) (micro % 1000 * 1000));
- case NanoTimestamp.SCHEMA_NAME:
- long nano = (long) dbzObj;
- return TimestampData.fromEpochMillis(
- nano / 1000_000, (int) (nano % 1000_000));
- }
- }
- LocalDateTime localDateTime =
- TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
- return TimestampData.fromLocalDateTime(localDateTime);
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof String) {
- String str = (String) dbzObj;
- // TIMESTAMP_LTZ type is encoded in string type
- Instant instant = Instant.parse(str);
- return TimestampData.fromLocalDateTime(
- LocalDateTime.ofInstant(instant, serverTimeZone));
- }
- throw new IllegalArgumentException(
- "Unable to convert to TimestampData from unexpected value '"
- + dbzObj
- + "' of type "
- + dbzObj.getClass().getName());
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToString() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- return StringData.fromString(dbzObj.toString());
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter convertToBinary() {
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) {
- if (dbzObj instanceof byte[]) {
- return dbzObj;
- } else if (dbzObj instanceof ByteBuffer) {
- ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
- byte[] bytes = new byte[byteBuffer.remaining()];
- byteBuffer.get(bytes);
- return bytes;
- } else {
- throw new UnsupportedOperationException(
- "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
- }
- }
-
- };
- }
-
- private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
- final DeserializationRuntimeConverter[] fieldConverters =
- rowType.getFields().stream()
- .map(RowType.RowField::getType)
- .map(MySQLDebeziumDeserializer::createNotNullConverter)
- .toArray(DeserializationRuntimeConverter[]::new);
- final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
-
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(Object dbzObj, Schema schema) throws Exception {
- Struct struct = (Struct) dbzObj;
- int arity = fieldNames.length;
- Object[] fieldByPosition = new Object[arity];
- LinkedHashMap<String, Integer> positionByName = new LinkedHashMap<>();
- for (int i = 0; i < arity; i++) {
- String fieldName = fieldNames[i];
- Field field = schema.field(fieldName);
- positionByName.put(fieldName, i);
- if (field == null) {
- fieldByPosition[i] = null;
- } else {
- Object fieldValue = struct.getWithoutDefault(fieldName);
- Schema fieldSchema = schema.field(fieldName).schema();
- Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
- fieldByPosition[i] = convertedField;
- }
- }
- return RowUtils.createRowWithNamedPositions(null, fieldByPosition, positionByName);
- }
-
- };
- }
-
- private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue,
- Schema fieldSchema) throws Exception {
- if (fieldValue == null) {
- return null;
- } else {
- return fieldConverter.convert(fieldValue, fieldSchema);
- }
- }
-
- }
自定义gp的写入算子,继承Flink RichSinkFunction 针对方法进行实现。
接收Flink收集器传送过来的数据,根据数据的changelog,表名称 自定义生成SQL语句,更新greenplum中的数据
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.table.data.DecimalData;
- import org.apache.flink.table.data.StringData;
- import org.apache.flink.table.data.TimestampData;
- import org.apache.flink.types.Row;
- import org.apache.flink.types.RowKind;
-
- import java.sql.*;
- import java.time.LocalDate;
- import java.time.LocalTime;
- import java.util.Map;
- import java.util.Properties;
- import java.util.Set;
- import java.util.StringJoiner;
- import java.util.stream.Collectors;
-
- @Slf4j
- public class DwSink extends RichSinkFunction<Tuple2<String, Row>> {
-
- private Connection connection = null;
- private PreparedStatement stmt = null;
- private Map<String, String> tableMapping;
- private String mapper = null;
- private String schema = null;
-
- private CdcPro cdcPro = null;
-
- public DwSink(CdcPro cdcPro) {
- this.cdcPro = cdcPro;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- //读取配置
- Properties properties = PropUtils.getProperties();
- String url = properties.getProperty("dw_jdbc");
- String username = properties.getProperty("dw_user");
- String password = properties.getProperty("dw_password");
- connection = DriverManager.getConnection(url, username, password);
- //初始化的表名映射关系
- String mapping = properties.getProperty("tableMapping");
- //正则匹配
- if (mapping.contains("{tableName}")) {
- mapper = mapping;
- } else {//Map匹配
- this.tableMapping = (Map) JacksonUtil.fromJson(mapping, Map.class);
- }
- schema = properties.getProperty("dw_schema");
- if (StringUtils.isBlank(schema)) {
- schema = "public";
- }
- }
-
- @Override
- public void invoke(Tuple2<String, Row> record, Context context) throws Exception {
- log.debug("invoke:" + record);
- Set<String> fieldNames = record.f1.getFieldNames(true);
- if (CollectionUtils.isEmpty(fieldNames)) {
- log.warn("fieldNames is empty, table:{}", record.f0);
- return;
- }
- if (RowKind.UPDATE_AFTER == record.f1.getKind() || RowKind.INSERT == record.f1.getKind()) {
- doInsert(record, fieldNames);
- } else {
- doDelete(record, fieldNames);
- }
- }
-
- /**
- * 删除的处理
- *
- * @param record record
- * @param fieldNames fieldNames
- * @throws Exception e
- */
- private void doDelete(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
- StringJoiner fieldNamesList = new StringJoiner(" and ");
- for (String fieldName : fieldNames) {
- if (record.f1.getField(fieldName) == null) {
- fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " is null");
- } else {
- fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " = ?");
- }
- }
- String sql = "delete from " + this.getMapperTableName(record.f0) + " where " + fieldNamesList;
- log.debug("del sql: {}", sql);
- PreparedStatement stmt = connection.prepareStatement(sql);
- int index = 1;
- for (String fieldName : fieldNames) {
- if (record.f1.getField(fieldName) != null) {
- handlePreparedStatement(stmt, record.f1.getField(fieldName), index);
- index++;
- }
- }
- stmt.execute();
- }
-
- /**
- * 插入数据的处理
- *
- * @param record record
- * @param fieldNames fieldNames
- * @throws Exception e
- */
- private void doInsert(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
- StringJoiner fieldNamesList = new StringJoiner(",");
- StringJoiner fieldParmList = new StringJoiner(",");
- for (String fieldName : fieldNames) {
- fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)));
- fieldParmList.add("?");
- }
- String sql = "insert into " + this.getMapperTableName(record.f0) + " ( " + fieldNamesList + " )" +
- " values (" + fieldParmList + ")";
- log.info("insert sql: {}", sql);
- PreparedStatement stmt = connection.prepareStatement(sql);
- int index = 1;
- for (String fieldName : fieldNames) {
- Object field = record.f1.getField(fieldName);
- handlePreparedStatement(stmt, field, index);
- index++;
- }
- stmt.execute();
- stmt.close();
- }
-
- /**
- * 处理flink数据类型到java-jdbc的数据类型
- *
- * @param stmt stmt
- * @param field field
- * @param index index
- * @throws Exception e
- */
- @Deprecated
- private void handlePreparedStatement(PreparedStatement stmt, Object field, int index) throws Exception {
- // if (field instanceof BinaryStringData) {
- // //JSON类型的数据序列化过来是BinaryStringData
- // PGobject pGobject = new PGobject();
- // pGobject.setType("json");
- // pGobject.setValue(field.toString());
- // stmt.setObject(index, pGobject);
- // } else
- if (field instanceof StringData) {
- stmt.setString(index, String.valueOf(field));
- } else if (field instanceof DecimalData) {
- stmt.setBigDecimal(index, ((DecimalData) field).toBigDecimal());
- } else if (field instanceof TimestampData) {
- stmt.setTimestamp(index, ((TimestampData) field).toTimestamp());
- } else if (field instanceof Short) {
- stmt.setShort(index, (Short) field);
- } else if (field instanceof Boolean) {
- stmt.setBoolean(index, (Boolean) field);
- } else if (field instanceof Integer) {
- stmt.setInt(index, (Integer) field);
- } else if (field instanceof Long) {
- stmt.setLong(index, (Long) field);
- } else if (field instanceof Double) {
- stmt.setDouble(index, (Double) field);
- } else if (field instanceof Float) {
- stmt.setFloat(index, (Float) field);
- } else if (field instanceof LocalDate) {
- stmt.setDate(index, Date.valueOf((LocalDate) field));
- } else if (field instanceof LocalTime) {
- LocalTime localTime = (LocalTime) field;
- stmt.setTime(index, new Time(localTime.getHour(), localTime.getMinute(), localTime.getSecond()));
- } else if (field instanceof byte[]) {
- stmt.setBytes(index, (byte[]) field);
- } else {
- stmt.setObject(index, field);
- }
- }
-
- private String quoteIdentifier(String identifier) {
- return "\"" + identifier + "\"";
- }
-
- private String coverColumn(String fileName){
- if(StringUtils.isNotEmpty(fileName) && StringUtils.isNotEmpty(cdcPro.getColumnConver())){
-
- if("uppercase".equalsIgnoreCase(cdcPro.getColumnConver())){
- return fileName.toUpperCase();
- }else if("lowercase".equalsIgnoreCase(cdcPro.getColumnConver())){
- return fileName.toLowerCase();
- }
- }
- return fileName;
- }
-
- @Override
- public void close() throws Exception {
- if (stmt != null) {
- stmt.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
-
- /**
- * 根据映射规则找到mysql表的对应dw的表
- *
- * @param originalTableName originalTableName
- * @return originalTableName
- */
- private String getMapperTableName(String originalTableName) {
- String dwTableName;
- if (!StringUtils.isBlank(mapper)) {
- dwTableName = mapper.replace("{tableName}", originalTableName);
- } else {
- dwTableName = tableMapping.get(originalTableName);
- }
- if (StringUtils.isBlank(dwTableName)) {
- log.error("mysql表: " + originalTableName + "没有找到对应的dw库所对应的表!");
- }
- return schema + "." + dwTableName;
- }
-
- }
- #mysql--source
- mysql_jdbc=jdbc:mysql://ip:3306/test?autoReconnect=true
- mysql_userName=root
- mysql_passWord=123456
- #\u7528\u9017\u53F7\u5206\u9694\u8868\u540D\uFF0C\u5982\u679C\u662F\u8BE5\u5E93\u4E0B\u7684\u5168\u90E8\u8868\uFF0C\u5219\u7528.*\u5373\u53EF
- mysql_tableList=.*
- excluded_tables=t1,t2,table_decimal
- #dw--sink
- dw_jdbc=jdbc:postgresql://ip:5432/postgres?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
- dw_schema=ods_cdctest1
- dw_user=gpadmin
- dw_password=123456
- #\u6620\u5C04\u5173\u7CFB\uFF0C\u53EF\u4EE5\u9009\u62E9map\u6620\u5C04\uFF0C\u4E5F\u53EF\u4EE5\u9009\u62E9\u4E0B\u9762\u6B63\u5219\u6620\u5C04\uFF0C\u4E8C\u9009\u4E00
- tableMapping={tableName}
- column_conver=lowercase
- #\u6216\u8005\u5339\u914D\u6B63\u5219\uFF0C\u6BD4\u5982\uFF1A
- #tableMapping=abc_{tableName}
-
- copy_dw_schema=public
- copy_dw_table=test
如果需要捕获表结构变更,可以实现数据库结构变更事件
- public void deserialize(SourceRecord record, Collector out) throws Exception {
- Struct value = (Struct) record.value();
- Schema keySchema = record.keySchema();
- //判断是否是数据库结构变更
- if(keySchema != null && keySchema.name().equals(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME)){
- /**
- * 获取读取的schema 变更信息
- * 类似于
- * {"source":{"file":"mysql-bin.000079","pos":4598526,"server_id":123654},"position":{"transaction_id":null,"ts_sec":1698822218,"file":"mysql-bin.000079","pos":4598686,"server_id":123654},"databaseName":"test1","ddl":"create table table_name1\n(\n id int null\n)","tableChanges":[]}
- *
- */
- String historyRecord = (String) value.get("historyRecord");
- Map recordMap = (Map) JacksonUtil.fromJson(historyRecord, Map.class);
- String ddl = recordMap.get("ddl").toString();
- String database = recordMap.get("databaseName").toString();
- out.collect(Tuple2.of(database,ddl));
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。