当前位置:   article > 正文

Flink CDC整库同步_flinkcdc整库同步

flinkcdc整库同步
背景

项目需要能够捕获外部数据源的数据变更,实时同步到目标数据库中,自动更新数据,实现源数据库和目标数据库所有表的数据同步更新,本文以mysql -> greenplumn场景记录实现方案。

实现
1.引入依赖
  1. <dependency>
  2. <groupId>com.ververica</groupId>
  3. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  4. <version>2.4.1</version>
  5. </dependency>
  6. <!-- mysql-->
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-java</artifactId>
  10. <version>8.0.29</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.postgresql</groupId>
  14. <artifactId>postgresql</artifactId>
  15. <version>42.5.0</version>
  16. </dependency>
2.创建FlinkCDCSource

创建FlinkCDC连接器,设置数据源的连接信息,日志捕获的起始时间点,读取并缓存数据源的元数据信息

  1. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  2. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.table.api.Schema;
  11. import org.apache.flink.table.catalog.DefaultCatalogTable;
  12. import org.apache.flink.table.catalog.ObjectPath;
  13. import org.apache.flink.table.catalog.exceptions.TableNotExistException;
  14. import org.apache.flink.table.types.DataType;
  15. import org.apache.flink.table.types.logical.LogicalType;
  16. import org.apache.flink.table.types.logical.RowType;
  17. import org.apache.flink.types.Row;
  18. import java.util.*;
  19. import java.util.stream.Collectors;
  20. /**
  21. * 目标表是否自动创建
  22. * 表结构变更
  23. */
  24. @Slf4j
  25. public class FlinkCdcMultiSyncJdbc {
  26. public static CdcPro cdcPro;
  27. public static MySqlCatalog mysqlCatalog;
  28. public static List<String> tables;
  29. public static void main(String[] args) throws Exception {
  30. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31. init();
  32. // 监控mysql binlog
  33. //创建 Flink-MySQL-CDC 的 Source
  34. //initial (default): 在第一次启动时对被监视的数据库表执行初始快照,并继续读取最新的binlog (开启断点续传后从上次消费offset继续消费)
  35. //latest-offset: 永远不要在第一次启动时对被监视的数据库表执行快照,只从binlog的末尾读取,这意味着只有自连接器启动以来的更改
  36. //timestamp: 永远不要在第一次启动时对监视的数据库表执行快照,直接从指定的时间戳读取binlog。使用者将从头遍历binlog,并忽略时间戳小于指定时间戳的更改事件
  37. //specific-offset: 不允许在第一次启动时对监视的数据库表进行快照,直接从指定的偏移量读取binlog。
  38. Properties jdbcProperties = new Properties();
  39. jdbcProperties.setProperty("time_zone", "+8:00");
  40. jdbcProperties.setProperty("serverTimeZone", "Asia/Shanghai");
  41. MySqlSource<Tuple2<String, Row>> mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()
  42. .hostname(cdcPro.getHost())
  43. .port(cdcPro.getPort())
  44. .databaseList(cdcPro.getDb())
  45. .tableList(tables.toArray(new String[0]))
  46. // .scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
  47. .username(cdcPro.getUserName())
  48. .password(cdcPro.getPassWord())
  49. .jdbcProperties(jdbcProperties)
  50. .deserializer(new MySQLDebeziumDeserializer(buildTableRowTypeMap(cdcPro, tables)))
  51. .startupOptions(StartupOptions.latest())
  52. .includeSchemaChanges(false)
  53. .build();
  54. SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource,
  55. WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
  56. dataStreamSource.addSink(new DwSink(cdcPro));
  57. env.execute("flinkcdc_mysql_dw");
  58. }
  59. private static void init() throws Exception {
  60. cdcPro = SourceUtil.createCdcPro();
  61. String source_url = String.format("jdbc:mysql://%s:%d", cdcPro.getHost(), cdcPro.getPort());
  62. // 注册同步的库对应的catalog
  63. mysqlCatalog = new MySqlCatalog(new ClassLoader() {
  64. @Override
  65. public Class<?> loadClass(String name) throws ClassNotFoundException {
  66. return super.loadClass(name);
  67. }
  68. }, "mysql-catalog", cdcPro.getDb(), cdcPro.getUserName(), cdcPro.getPassWord(), source_url);
  69. // List<String> dbs = mysqlCatalog.listDatabases();
  70. tables = buildTables(cdcPro);
  71. System.setProperty("HADOOP_USER_NAME", "root");
  72. }
  73. private static List<String> buildTables(CdcPro cdcPro) throws Exception {
  74. // 如果整库同步,则从catalog里取所有表,否则从指定表中取表名
  75. List<String> tables = new ArrayList<>();
  76. if (".*".equals(cdcPro.getTableList())) {
  77. tables = mysqlCatalog.listTables(cdcPro.getDb());
  78. } else {
  79. String[] tableArray = cdcPro.getTableList().split(",");
  80. for (String table : tableArray) {
  81. tables.add(table);
  82. }
  83. }
  84. //过滤提出列表中的表清单
  85. tables = tables.stream().filter(tableName->!cdcPro.getExcludeTables().contains(tableName)).map(t->cdcPro.getDb() + "." + t).collect(Collectors.toList());
  86. log.info("cdc tables: \n" + tables);
  87. return tables;
  88. }
  89. private static Map<String, RowType> buildTableRowTypeMap(CdcPro cdcPro, List<String> tables) throws Exception {
  90. Map<String, RowType> tableRowTypeMap = new HashMap<>();
  91. // Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();
  92. tables.parallelStream().forEach(table ->{
  93. String tableName = StringUtils.replace(table, cdcPro.getDb() + ".", "");
  94. // 获取mysql catalog中注册的表
  95. ObjectPath objectPath = new ObjectPath(cdcPro.getDb(), tableName);
  96. DefaultCatalogTable catalogBaseTable = null;
  97. try {
  98. catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
  99. } catch (TableNotExistException e) {
  100. throw new RuntimeException(e);
  101. }
  102. // 获取表的Schema
  103. Schema schema = catalogBaseTable.getUnresolvedSchema();
  104. // 获取表中字段名列表和表字段类型
  105. String[] fieldNames = new String[schema.getColumns().size()];
  106. LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
  107. for (int i = 0; i < schema.getColumns().size(); i++) {
  108. Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
  109. fieldNames[i] = column.getName();
  110. logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
  111. }
  112. tableRowTypeMap.put(tableName, RowType.of(logicalTypes, fieldNames));
  113. });
  114. return tableRowTypeMap;
  115. }
  116. }
3.创建数据解析器

创建自定义数据解析器,根据FlinkCDC捕获的日志内容,解析出变更的表、数据、操作类型。

自定义数据转换器,将日志解析数据转换成为Flink数据类型

设置Flink数据的Changelog,表和数据组成二元组,放入Flink收集器中。

  1. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
  2. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
  3. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
  4. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
  5. import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
  6. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  7. import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
  8. import com.ververica.cdc.debezium.utils.TemporalConversions;
  9. import io.debezium.data.Envelope;
  10. import io.debezium.data.SpecialValueDecimal;
  11. import io.debezium.data.VariableScaleDecimal;
  12. import io.debezium.time.*;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.flink.api.common.typeinfo.TypeHint;
  15. import org.apache.flink.api.common.typeinfo.TypeInformation;
  16. import org.apache.flink.api.java.tuple.Tuple2;
  17. import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
  18. import org.apache.flink.table.catalog.DefaultCatalogTable;
  19. import org.apache.flink.table.catalog.ObjectPath;
  20. import org.apache.flink.table.data.DecimalData;
  21. import org.apache.flink.table.data.StringData;
  22. import org.apache.flink.table.data.TimestampData;
  23. import org.apache.flink.table.types.DataType;
  24. import org.apache.flink.table.types.logical.DecimalType;
  25. import org.apache.flink.table.types.logical.LogicalType;
  26. import org.apache.flink.table.types.logical.RowType;
  27. import org.apache.flink.types.Row;
  28. import org.apache.flink.types.RowKind;
  29. import org.apache.flink.types.RowUtils;
  30. import org.apache.flink.util.Collector;
  31. import java.math.BigDecimal;
  32. import java.nio.ByteBuffer;
  33. import java.time.Instant;
  34. import java.time.LocalDateTime;
  35. import java.time.LocalTime;
  36. import java.time.ZoneId;
  37. import java.util.LinkedHashMap;
  38. import java.util.Map;
  39. import java.util.Properties;
  40. @Slf4j
  41. public class MySQLDebeziumDeserializer implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
  42. private final Map<String, RowType> tableRowTypeMap;
  43. private final Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();
  44. MySQLDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
  45. this.tableRowTypeMap = tableRowTypeMap;
  46. for (String tableName : this.tableRowTypeMap.keySet()) {
  47. RowType rowType = this.tableRowTypeMap.get(tableName);
  48. DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
  49. this.physicalConverterMap.put(tableName, physicalConverter);
  50. }
  51. }
  52. @Override
  53. public void deserialize(SourceRecord record, Collector out) throws Exception {
  54. Struct value = (Struct) record.value();
  55. // log.info("value:{}", value);
  56. String tableName = value.getStruct("source").get("table").toString();
  57. //如果该表不在初始化的列表中,过滤该表的操作
  58. if(!physicalConverterMap.containsKey(tableName)){
  59. return;
  60. }
  61. DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
  62. Envelope.Operation op = Envelope.operationFor(record);
  63. if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
  64. Row insert = extractAfterRow(record, physicalConverter);
  65. insert.setKind(RowKind.INSERT);
  66. out.collect(Tuple2.of(tableName, insert));
  67. } else if (op == Envelope.Operation.DELETE) {
  68. Row delete = extractBeforeRow(record, physicalConverter);
  69. delete.setKind(RowKind.DELETE);
  70. out.collect(Tuple2.of(tableName, delete));
  71. } else if (op == Envelope.Operation.UPDATE) {
  72. Row before = extractBeforeRow(record, physicalConverter);
  73. before.setKind(RowKind.UPDATE_BEFORE);
  74. out.collect(Tuple2.of(tableName, before));
  75. Row after = extractAfterRow(record, physicalConverter);
  76. after.setKind(RowKind.UPDATE_AFTER);
  77. out.collect(Tuple2.of(tableName, after));
  78. }
  79. }
  80. private Row extractAfterRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
  81. Schema afterSchema = record.valueSchema().field(Envelope.FieldName.AFTER).schema();
  82. Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
  83. return (Row) physicalConverter.convert(after, afterSchema);
  84. }
  85. private Row extractBeforeRow(SourceRecord record, DeserializationRuntimeConverter physicalConverter) throws Exception {
  86. Schema beforeSchema = record.valueSchema().field(Envelope.FieldName.BEFORE).schema();
  87. Struct before = ((Struct) record.value()).getStruct(Envelope.FieldName.BEFORE);
  88. return (Row) physicalConverter.convert(before, beforeSchema);
  89. }
  90. @Override
  91. public TypeInformation<Tuple2<String, Row>> getProducedType() {
  92. return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {
  93. });
  94. }
  95. public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
  96. /**
  97. * datetime TIMESTAMP_WITHOUT_TIME_ZONE
  98. * date DATE
  99. * time TIME_WITHOUT_TIME_ZONE
  100. * timestamp TIMESTAMP_WITHOUT_TIME_ZONE
  101. */
  102. switch (type.getTypeRoot()) {
  103. case NULL:
  104. return convertToNull();
  105. case BOOLEAN:
  106. return convertToInt();
  107. case TINYINT:
  108. return convertToInt();
  109. case SMALLINT:
  110. return convertToInt();
  111. case INTEGER:
  112. case INTERVAL_YEAR_MONTH:
  113. return convertToInt();
  114. case BIGINT:
  115. case INTERVAL_DAY_TIME:
  116. return convertToLong();
  117. case DATE:
  118. return convertToDate();
  119. case TIME_WITHOUT_TIME_ZONE:
  120. return convertToTime();
  121. case TIMESTAMP_WITHOUT_TIME_ZONE:
  122. return convertToTimestamp(ZoneId.of("UTC+8"));
  123. case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
  124. return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
  125. case FLOAT:
  126. return convertToFloat();
  127. case DOUBLE:
  128. return convertToDouble();
  129. case CHAR:
  130. case VARCHAR:
  131. return convertToString();
  132. case BINARY:
  133. case VARBINARY:
  134. return convertToBinary();
  135. case DECIMAL:
  136. return createDecimalConverter((DecimalType) type);
  137. case ROW:
  138. return createRowConverter((RowType) type);
  139. case ARRAY:
  140. case MAP:
  141. case MULTISET:
  142. case RAW:
  143. default:
  144. throw new UnsupportedOperationException("Unsupported type: " + type);
  145. }
  146. }
  147. private static DeserializationRuntimeConverter convertToNull() {
  148. return new DeserializationRuntimeConverter() {
  149. private static final long serialVersionUID = 1L;
  150. @Override
  151. public Object convert(Object dbzObj, Schema schema) {
  152. return null;
  153. }
  154. };
  155. }
  156. private static DeserializationRuntimeConverter convertToByte() {
  157. return new DeserializationRuntimeConverter() {
  158. private static final long serialVersionUID = 1L;
  159. @Override
  160. public Object convert(Object dbzObj, Schema schema) {
  161. return Byte.parseByte(dbzObj.toString());
  162. }
  163. };
  164. }
  165. private static DeserializationRuntimeConverter convertToBoolean() {
  166. return new DeserializationRuntimeConverter() {
  167. private static final long serialVersionUID = 1L;
  168. @Override
  169. public Object convert(Object dbzObj, Schema schema) {
  170. if (dbzObj instanceof Boolean) {
  171. return dbzObj;
  172. } else if (dbzObj instanceof Byte) {
  173. return (byte) dbzObj == 1;
  174. } else if (dbzObj instanceof Short) {
  175. return (short) dbzObj == 1;
  176. } else {
  177. return Boolean.parseBoolean(dbzObj.toString());
  178. }
  179. }
  180. };
  181. }
  182. private static DeserializationRuntimeConverter convertToShort() {
  183. return new DeserializationRuntimeConverter() {
  184. private static final long serialVersionUID = 1L;
  185. @Override
  186. public Object convert(Object dbzObj, Schema schema) {
  187. return Short.parseShort(dbzObj.toString());
  188. }
  189. };
  190. }
  191. private static DeserializationRuntimeConverter convertToInt() {
  192. return new DeserializationRuntimeConverter() {
  193. private static final long serialVersionUID = 1L;
  194. @Override
  195. public Object convert(Object dbzObj, Schema schema) {
  196. if (dbzObj instanceof Integer) {
  197. return dbzObj;
  198. } else if (dbzObj instanceof Long) {
  199. return ((Long) dbzObj).intValue();
  200. } else {
  201. return Integer.parseInt(dbzObj.toString());
  202. }
  203. }
  204. };
  205. }
  206. private static DeserializationRuntimeConverter convertToLong() {
  207. return new DeserializationRuntimeConverter() {
  208. private static final long serialVersionUID = 1L;
  209. @Override
  210. public Object convert(Object dbzObj, Schema schema) {
  211. if (dbzObj instanceof Integer) {
  212. return ((Integer) dbzObj).longValue();
  213. } else if (dbzObj instanceof Long) {
  214. return dbzObj;
  215. } else {
  216. return Long.parseLong(dbzObj.toString());
  217. }
  218. }
  219. };
  220. }
  221. private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
  222. int precision = decimalType.getPrecision();
  223. int scale = decimalType.getScale();
  224. if(precision > 38){
  225. precision = 38;
  226. }
  227. if(scale > 10){
  228. scale = 10;
  229. }
  230. int finalPrecision = precision;
  231. int finalScale = scale;
  232. return new DeserializationRuntimeConverter() {
  233. private static final long serialVersionUID = 1L;
  234. @Override
  235. public Object convert(Object dbzObj, Schema schema) {
  236. BigDecimal bigDecimal;
  237. if (dbzObj instanceof byte[]) {
  238. // decimal.handling.mode=precise
  239. bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
  240. } else if (dbzObj instanceof String) {
  241. // decimal.handling.mode=string
  242. bigDecimal = new BigDecimal((String) dbzObj);
  243. } else if (dbzObj instanceof Double) {
  244. // decimal.handling.mode=double
  245. bigDecimal = BigDecimal.valueOf((Double) dbzObj);
  246. } else {
  247. if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
  248. SpecialValueDecimal decimal =
  249. VariableScaleDecimal.toLogical((Struct) dbzObj);
  250. bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
  251. } else {
  252. // fallback to string
  253. bigDecimal = new BigDecimal(dbzObj.toString());
  254. }
  255. }
  256. return DecimalData.fromBigDecimal(bigDecimal, finalPrecision, finalScale);
  257. }
  258. };
  259. }
  260. private static DeserializationRuntimeConverter convertToDouble() {
  261. return new DeserializationRuntimeConverter() {
  262. private static final long serialVersionUID = 1L;
  263. @Override
  264. public Object convert(Object dbzObj, Schema schema) {
  265. if (dbzObj instanceof Float) {
  266. return ((Float) dbzObj).doubleValue();
  267. } else if (dbzObj instanceof Double) {
  268. return dbzObj;
  269. } else {
  270. return Double.parseDouble(dbzObj.toString());
  271. }
  272. }
  273. };
  274. }
  275. private static DeserializationRuntimeConverter convertToFloat() {
  276. return new DeserializationRuntimeConverter() {
  277. private static final long serialVersionUID = 1L;
  278. @Override
  279. public Object convert(Object dbzObj, Schema schema) {
  280. if (dbzObj instanceof Float) {
  281. return dbzObj;
  282. } else if (dbzObj instanceof Double) {
  283. return ((Double) dbzObj).floatValue();
  284. } else {
  285. return Float.parseFloat(dbzObj.toString());
  286. }
  287. }
  288. };
  289. }
  290. private static DeserializationRuntimeConverter convertToDate() {
  291. return new DeserializationRuntimeConverter() {
  292. private static final long serialVersionUID = 1L;
  293. @Override
  294. public Object convert(Object dbzObj, Schema schema) {
  295. return TemporalConversions.toLocalDate(dbzObj);
  296. }
  297. };
  298. }
  299. private static DeserializationRuntimeConverter convertToTime() {
  300. return new DeserializationRuntimeConverter() {
  301. private static final long serialVersionUID = 1L;
  302. @Override
  303. public Object convert(Object dbzObj, Schema schema) {
  304. if (dbzObj instanceof Long) {
  305. int seconds = 0;
  306. switch (schema.name()) {
  307. case MicroTime.SCHEMA_NAME:
  308. seconds = (int) ((long) dbzObj / 1000);
  309. case NanoTime.SCHEMA_NAME:
  310. seconds = (int) ((long) dbzObj / 1000_000);
  311. }
  312. return LocalTime.ofSecondOfDay(seconds);
  313. } else if (dbzObj instanceof Integer) {
  314. return LocalTime.ofSecondOfDay((Integer) dbzObj);
  315. }
  316. // get number of milliseconds of the day
  317. return TemporalConversions.toLocalTime(dbzObj);
  318. }
  319. };
  320. }
  321. private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
  322. return new DeserializationRuntimeConverter() {
  323. private static final long serialVersionUID = 1L;
  324. @Override
  325. public Object convert(Object dbzObj, Schema schema) {
  326. if (dbzObj instanceof Long) {
  327. switch (schema.name()) {
  328. case Timestamp.SCHEMA_NAME:
  329. return TimestampData.fromEpochMillis((Long) dbzObj);
  330. case MicroTimestamp.SCHEMA_NAME:
  331. long micro = (long) dbzObj;
  332. return TimestampData.fromEpochMillis(
  333. micro / 1000, (int) (micro % 1000 * 1000));
  334. case NanoTimestamp.SCHEMA_NAME:
  335. long nano = (long) dbzObj;
  336. return TimestampData.fromEpochMillis(
  337. nano / 1000_000, (int) (nano % 1000_000));
  338. }
  339. }
  340. LocalDateTime localDateTime =
  341. TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
  342. return TimestampData.fromLocalDateTime(localDateTime);
  343. }
  344. };
  345. }
  346. private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
  347. return new DeserializationRuntimeConverter() {
  348. private static final long serialVersionUID = 1L;
  349. @Override
  350. public Object convert(Object dbzObj, Schema schema) {
  351. if (dbzObj instanceof String) {
  352. String str = (String) dbzObj;
  353. // TIMESTAMP_LTZ type is encoded in string type
  354. Instant instant = Instant.parse(str);
  355. return TimestampData.fromLocalDateTime(
  356. LocalDateTime.ofInstant(instant, serverTimeZone));
  357. }
  358. throw new IllegalArgumentException(
  359. "Unable to convert to TimestampData from unexpected value '"
  360. + dbzObj
  361. + "' of type "
  362. + dbzObj.getClass().getName());
  363. }
  364. };
  365. }
  366. private static DeserializationRuntimeConverter convertToString() {
  367. return new DeserializationRuntimeConverter() {
  368. private static final long serialVersionUID = 1L;
  369. @Override
  370. public Object convert(Object dbzObj, Schema schema) {
  371. return StringData.fromString(dbzObj.toString());
  372. }
  373. };
  374. }
  375. private static DeserializationRuntimeConverter convertToBinary() {
  376. return new DeserializationRuntimeConverter() {
  377. private static final long serialVersionUID = 1L;
  378. @Override
  379. public Object convert(Object dbzObj, Schema schema) {
  380. if (dbzObj instanceof byte[]) {
  381. return dbzObj;
  382. } else if (dbzObj instanceof ByteBuffer) {
  383. ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
  384. byte[] bytes = new byte[byteBuffer.remaining()];
  385. byteBuffer.get(bytes);
  386. return bytes;
  387. } else {
  388. throw new UnsupportedOperationException(
  389. "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
  390. }
  391. }
  392. };
  393. }
  394. private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
  395. final DeserializationRuntimeConverter[] fieldConverters =
  396. rowType.getFields().stream()
  397. .map(RowType.RowField::getType)
  398. .map(MySQLDebeziumDeserializer::createNotNullConverter)
  399. .toArray(DeserializationRuntimeConverter[]::new);
  400. final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
  401. return new DeserializationRuntimeConverter() {
  402. private static final long serialVersionUID = 1L;
  403. @Override
  404. public Object convert(Object dbzObj, Schema schema) throws Exception {
  405. Struct struct = (Struct) dbzObj;
  406. int arity = fieldNames.length;
  407. Object[] fieldByPosition = new Object[arity];
  408. LinkedHashMap<String, Integer> positionByName = new LinkedHashMap<>();
  409. for (int i = 0; i < arity; i++) {
  410. String fieldName = fieldNames[i];
  411. Field field = schema.field(fieldName);
  412. positionByName.put(fieldName, i);
  413. if (field == null) {
  414. fieldByPosition[i] = null;
  415. } else {
  416. Object fieldValue = struct.getWithoutDefault(fieldName);
  417. Schema fieldSchema = schema.field(fieldName).schema();
  418. Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
  419. fieldByPosition[i] = convertedField;
  420. }
  421. }
  422. return RowUtils.createRowWithNamedPositions(null, fieldByPosition, positionByName);
  423. }
  424. };
  425. }
  426. private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue,
  427. Schema fieldSchema) throws Exception {
  428. if (fieldValue == null) {
  429. return null;
  430. } else {
  431. return fieldConverter.convert(fieldValue, fieldSchema);
  432. }
  433. }
  434. }
4.自定义Greenplum 写入算子

自定义gp的写入算子,继承Flink RichSinkFunction 针对方法进行实现。

接收Flink收集器传送过来的数据,根据数据的changelog,表名称 自定义生成SQL语句,更新greenplum中的数据

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.commons.collections.CollectionUtils;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  7. import org.apache.flink.table.data.DecimalData;
  8. import org.apache.flink.table.data.StringData;
  9. import org.apache.flink.table.data.TimestampData;
  10. import org.apache.flink.types.Row;
  11. import org.apache.flink.types.RowKind;
  12. import java.sql.*;
  13. import java.time.LocalDate;
  14. import java.time.LocalTime;
  15. import java.util.Map;
  16. import java.util.Properties;
  17. import java.util.Set;
  18. import java.util.StringJoiner;
  19. import java.util.stream.Collectors;
  20. @Slf4j
  21. public class DwSink extends RichSinkFunction<Tuple2<String, Row>> {
  22. private Connection connection = null;
  23. private PreparedStatement stmt = null;
  24. private Map<String, String> tableMapping;
  25. private String mapper = null;
  26. private String schema = null;
  27. private CdcPro cdcPro = null;
  28. public DwSink(CdcPro cdcPro) {
  29. this.cdcPro = cdcPro;
  30. }
  31. @Override
  32. public void open(Configuration parameters) throws Exception {
  33. //读取配置
  34. Properties properties = PropUtils.getProperties();
  35. String url = properties.getProperty("dw_jdbc");
  36. String username = properties.getProperty("dw_user");
  37. String password = properties.getProperty("dw_password");
  38. connection = DriverManager.getConnection(url, username, password);
  39. //初始化的表名映射关系
  40. String mapping = properties.getProperty("tableMapping");
  41. //正则匹配
  42. if (mapping.contains("{tableName}")) {
  43. mapper = mapping;
  44. } else {//Map匹配
  45. this.tableMapping = (Map) JacksonUtil.fromJson(mapping, Map.class);
  46. }
  47. schema = properties.getProperty("dw_schema");
  48. if (StringUtils.isBlank(schema)) {
  49. schema = "public";
  50. }
  51. }
  52. @Override
  53. public void invoke(Tuple2<String, Row> record, Context context) throws Exception {
  54. log.debug("invoke:" + record);
  55. Set<String> fieldNames = record.f1.getFieldNames(true);
  56. if (CollectionUtils.isEmpty(fieldNames)) {
  57. log.warn("fieldNames is empty, table:{}", record.f0);
  58. return;
  59. }
  60. if (RowKind.UPDATE_AFTER == record.f1.getKind() || RowKind.INSERT == record.f1.getKind()) {
  61. doInsert(record, fieldNames);
  62. } else {
  63. doDelete(record, fieldNames);
  64. }
  65. }
  66. /**
  67. * 删除的处理
  68. *
  69. * @param record record
  70. * @param fieldNames fieldNames
  71. * @throws Exception e
  72. */
  73. private void doDelete(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
  74. StringJoiner fieldNamesList = new StringJoiner(" and ");
  75. for (String fieldName : fieldNames) {
  76. if (record.f1.getField(fieldName) == null) {
  77. fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " is null");
  78. } else {
  79. fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)) + " = ?");
  80. }
  81. }
  82. String sql = "delete from " + this.getMapperTableName(record.f0) + " where " + fieldNamesList;
  83. log.debug("del sql: {}", sql);
  84. PreparedStatement stmt = connection.prepareStatement(sql);
  85. int index = 1;
  86. for (String fieldName : fieldNames) {
  87. if (record.f1.getField(fieldName) != null) {
  88. handlePreparedStatement(stmt, record.f1.getField(fieldName), index);
  89. index++;
  90. }
  91. }
  92. stmt.execute();
  93. }
  94. /**
  95. * 插入数据的处理
  96. *
  97. * @param record record
  98. * @param fieldNames fieldNames
  99. * @throws Exception e
  100. */
  101. private void doInsert(Tuple2<String, Row> record, Set<String> fieldNames) throws Exception {
  102. StringJoiner fieldNamesList = new StringJoiner(",");
  103. StringJoiner fieldParmList = new StringJoiner(",");
  104. for (String fieldName : fieldNames) {
  105. fieldNamesList.add(quoteIdentifier(coverColumn(fieldName)));
  106. fieldParmList.add("?");
  107. }
  108. String sql = "insert into " + this.getMapperTableName(record.f0) + " ( " + fieldNamesList + " )" +
  109. " values (" + fieldParmList + ")";
  110. log.info("insert sql: {}", sql);
  111. PreparedStatement stmt = connection.prepareStatement(sql);
  112. int index = 1;
  113. for (String fieldName : fieldNames) {
  114. Object field = record.f1.getField(fieldName);
  115. handlePreparedStatement(stmt, field, index);
  116. index++;
  117. }
  118. stmt.execute();
  119. stmt.close();
  120. }
  121. /**
  122. * 处理flink数据类型到java-jdbc的数据类型
  123. *
  124. * @param stmt stmt
  125. * @param field field
  126. * @param index index
  127. * @throws Exception e
  128. */
  129. @Deprecated
  130. private void handlePreparedStatement(PreparedStatement stmt, Object field, int index) throws Exception {
  131. // if (field instanceof BinaryStringData) {
  132. // //JSON类型的数据序列化过来是BinaryStringData
  133. // PGobject pGobject = new PGobject();
  134. // pGobject.setType("json");
  135. // pGobject.setValue(field.toString());
  136. // stmt.setObject(index, pGobject);
  137. // } else
  138. if (field instanceof StringData) {
  139. stmt.setString(index, String.valueOf(field));
  140. } else if (field instanceof DecimalData) {
  141. stmt.setBigDecimal(index, ((DecimalData) field).toBigDecimal());
  142. } else if (field instanceof TimestampData) {
  143. stmt.setTimestamp(index, ((TimestampData) field).toTimestamp());
  144. } else if (field instanceof Short) {
  145. stmt.setShort(index, (Short) field);
  146. } else if (field instanceof Boolean) {
  147. stmt.setBoolean(index, (Boolean) field);
  148. } else if (field instanceof Integer) {
  149. stmt.setInt(index, (Integer) field);
  150. } else if (field instanceof Long) {
  151. stmt.setLong(index, (Long) field);
  152. } else if (field instanceof Double) {
  153. stmt.setDouble(index, (Double) field);
  154. } else if (field instanceof Float) {
  155. stmt.setFloat(index, (Float) field);
  156. } else if (field instanceof LocalDate) {
  157. stmt.setDate(index, Date.valueOf((LocalDate) field));
  158. } else if (field instanceof LocalTime) {
  159. LocalTime localTime = (LocalTime) field;
  160. stmt.setTime(index, new Time(localTime.getHour(), localTime.getMinute(), localTime.getSecond()));
  161. } else if (field instanceof byte[]) {
  162. stmt.setBytes(index, (byte[]) field);
  163. } else {
  164. stmt.setObject(index, field);
  165. }
  166. }
  167. private String quoteIdentifier(String identifier) {
  168. return "\"" + identifier + "\"";
  169. }
  170. private String coverColumn(String fileName){
  171. if(StringUtils.isNotEmpty(fileName) && StringUtils.isNotEmpty(cdcPro.getColumnConver())){
  172. if("uppercase".equalsIgnoreCase(cdcPro.getColumnConver())){
  173. return fileName.toUpperCase();
  174. }else if("lowercase".equalsIgnoreCase(cdcPro.getColumnConver())){
  175. return fileName.toLowerCase();
  176. }
  177. }
  178. return fileName;
  179. }
  180. @Override
  181. public void close() throws Exception {
  182. if (stmt != null) {
  183. stmt.close();
  184. }
  185. if (connection != null) {
  186. connection.close();
  187. }
  188. }
  189. /**
  190. * 根据映射规则找到mysql表的对应dw的表
  191. *
  192. * @param originalTableName originalTableName
  193. * @return originalTableName
  194. */
  195. private String getMapperTableName(String originalTableName) {
  196. String dwTableName;
  197. if (!StringUtils.isBlank(mapper)) {
  198. dwTableName = mapper.replace("{tableName}", originalTableName);
  199. } else {
  200. dwTableName = tableMapping.get(originalTableName);
  201. }
  202. if (StringUtils.isBlank(dwTableName)) {
  203. log.error("mysql表: " + originalTableName + "没有找到对应的dw库所对应的表!");
  204. }
  205. return schema + "." + dwTableName;
  206. }
  207. }
5.参数配置
  1. #mysql--source
  2. mysql_jdbc=jdbc:mysql://ip:3306/test?autoReconnect=true
  3. mysql_userName=root
  4. mysql_passWord=123456
  5. #\u7528\u9017\u53F7\u5206\u9694\u8868\u540D\uFF0C\u5982\u679C\u662F\u8BE5\u5E93\u4E0B\u7684\u5168\u90E8\u8868\uFF0C\u5219\u7528.*\u5373\u53EF
  6. mysql_tableList=.*
  7. excluded_tables=t1,t2,table_decimal
  8. #dw--sink
  9. dw_jdbc=jdbc:postgresql://ip:5432/postgres?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false
  10. dw_schema=ods_cdctest1
  11. dw_user=gpadmin
  12. dw_password=123456
  13. #\u6620\u5C04\u5173\u7CFB\uFF0C\u53EF\u4EE5\u9009\u62E9map\u6620\u5C04\uFF0C\u4E5F\u53EF\u4EE5\u9009\u62E9\u4E0B\u9762\u6B63\u5219\u6620\u5C04\uFF0C\u4E8C\u9009\u4E00
  14. tableMapping={tableName}
  15. column_conver=lowercase
  16. #\u6216\u8005\u5339\u914D\u6B63\u5219\uFF0C\u6BD4\u5982\uFF1A
  17. #tableMapping=abc_{tableName}
  18. copy_dw_schema=public
  19. copy_dw_table=test
其他

如果需要捕获表结构变更,可以实现数据库结构变更事件

  1. public void deserialize(SourceRecord record, Collector out) throws Exception {
  2. Struct value = (Struct) record.value();
  3. Schema keySchema = record.keySchema();
  4. //判断是否是数据库结构变更
  5. if(keySchema != null && keySchema.name().equals(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME)){
  6. /**
  7. * 获取读取的schema 变更信息
  8. * 类似于
  9. * {"source":{"file":"mysql-bin.000079","pos":4598526,"server_id":123654},"position":{"transaction_id":null,"ts_sec":1698822218,"file":"mysql-bin.000079","pos":4598686,"server_id":123654},"databaseName":"test1","ddl":"create table table_name1\n(\n id int null\n)","tableChanges":[]}
  10. *
  11. */
  12. String historyRecord = (String) value.get("historyRecord");
  13. Map recordMap = (Map) JacksonUtil.fromJson(historyRecord, Map.class);
  14. String ddl = recordMap.get("ddl").toString();
  15. String database = recordMap.get("databaseName").toString();
  16. out.collect(Tuple2.of(database,ddl));
  17. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/1019879
推荐阅读
相关标签
  

闽ICP备14008679号