当前位置:   article > 正文

FlinkCDC(三)同步mysql的binlog到doris_flink binlog doris

flink binlog doris

前面一章已经把flinkcdc-mysql-doris的环境搭建好了;

要想实现mysql数据同步到doris,首先要解决以下几个问题:

1、在同步的时候,怎么实现在doris里面自动创建表结构。如果不是自动创建表结构,那么每次同步一个表都需要手动去创建表结构,这样的话会很累;

2、多个表或者整库同步的时候,怎么通过只连接一次,就可以监听多个表或者整库的数据变动;

3、表结构的变更如何同步;

4、分库分表如何同步;

所以呢,这篇文章先解决第一个问题,如何自动同步表结构到doris;

表结构的同步有两种方式:一种使用Flink的Catalog;另外一种使用jdbc的方式,直接读取mysql的表结构信息,然后通过jdbc的方式连接doris创建表结构;

在这里采用第二种:使用jdbc的方式,直接读取mysql的表结构信息,然后通过jdbc的方式连接doris创建表结构;

  1. public static List<String> getSchemaTableNameList(List<Schema> schemaList, FlinkCDCConfig config, CDCBuilder cdcBuilder) throws Exception {
  2. final List<String> schemaNameList = cdcBuilder.getSchemaList();
  3. final List<String> tableRegList = cdcBuilder.getTableList();
  4. final List<String> schemaTableNameList = new ArrayList<>();
  5. Map<String, String> splitConfig = config.getSplit();
  6. DriverConfig driverConfig = DriverConfig.build(cdcBuilder.parseMetaDataConfig());
  7. Driver driver = Driver.build(driverConfig);
  8. Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
  9. Set<Table> tables = driver.getSplitTables(tableRegList, splitConfig);
  10. for(Table table:tables){
  11. String schemaName = table.getSchema();
  12. Schema schema = Schema.build(schemaName);
  13. schema.setTables(Collections.singletonList(table));
  14. // 分库分表所有表结构都是一样的,取出列表中第一个表名即可
  15. String schemaTableName = table.getSchemaTableNameList().get(0);
  16. // 真实的表名
  17. String tableName = schemaTableName.split("\\.")[1];
  18. table.setColumns(driver.listColumnsSortByPK(schemaName, tableName));
  19. table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
  20. schemaList.add(schema);
  21. }
  22. for(String schemaName:schemaNameList){
  23. Schema schema = Schema.build(schemaName);
  24. if (!allConfigMap.containsKey(schemaName)) {
  25. continue;
  26. }
  27. Map<String, String> sink = config.getSink();
  28. Driver sinkDriver = DriverSchema.checkAndCreateSinkSchema(sink, schemaName);
  29. DriverConfig driverConfig1 = DriverConfig.build(allConfigMap.get(schemaName));
  30. Driver driver1 = Driver.build(driverConfig1);
  31. final List<Table> tables1 = driver1.listTables(schemaName);
  32. for (Table table : tables1) {
  33. if (!Asserts.isEquals(table.getType(), "VIEW")) {
  34. if (Asserts.isNotNullCollection(tableRegList)) {
  35. for (String tableReg : tableRegList) {
  36. if (table.getSchemaTableName().matches(tableReg.trim())
  37. && !schema.getTables()
  38. .contains(Table.build(table.getName()))) {
  39. table.setColumns(
  40. driver.listColumnsSortByPK(
  41. schemaName, table.getName()));
  42. schema.getTables().add(table);
  43. schemaTableNameList.add(table.getSchemaTableName());
  44. break;
  45. }
  46. }
  47. } else {
  48. table.setColumns(
  49. driver.listColumnsSortByPK(schemaName, table.getName()));
  50. schemaTableNameList.add(table.getSchemaTableName());
  51. schema.getTables().add(table);
  52. }
  53. }
  54. }
  55. if (null != sinkDriver) {
  56. for (Table table : schema.getTables()) {
  57. Table sinkTable = (Table) table.clone();
  58. sinkTable.setSchema(table.getSchema());
  59. sinkTable.setName(table.getName());
  60. checkAndCreateSinkTable(sinkDriver, sinkTable);
  61. }
  62. }
  63. schemaList.add(schema);
  64. }
  65. return schemaTableNameList;
  66. }

以上就是通过jdbc读取mysql的schema信息,插入到doris。

checkAndCreateSinkTable就是DDL语句插入到doris里面:

flinkcdc-mysql-doris的项目地址:https://gitee.com/jwandbj/flinkcdc-mysql-doris 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/398635
推荐阅读
相关标签
  

闽ICP备14008679号