赞
踩
前面一章已经把flinkcdc-mysql-doris的环境搭建好了;
要想实现mysql数据同步到doris,首先要解决以下几个问题:
1、在同步的时候,怎么实现在doris里面自动创建表结构。如果不是自动创建表结构,那么每次同步一个表都需要手动去创建表结构,这样的话会很累;
2、多个表或者整库同步的时候,怎么通过只连接一次,就可以监听多个表或者整库的数据变动;
3、表结构的变更如何同步;
4、分库分表如何同步;
所以呢,这篇文章先解决第一个问题,如何自动同步表结构到doris;
表结构的同步有两种方式:一种使用Flink的Catalog;另外一种使用jdbc的方式,直接读取mysql的表结构信息,然后通过jdbc的方式连接doris创建表结构;
在这里采用第二种:使用jdbc的方式,直接读取mysql的表结构信息,然后通过jdbc的方式连接doris创建表结构;
- public static List<String> getSchemaTableNameList(List<Schema> schemaList, FlinkCDCConfig config, CDCBuilder cdcBuilder) throws Exception {
- final List<String> schemaNameList = cdcBuilder.getSchemaList();
- final List<String> tableRegList = cdcBuilder.getTableList();
- final List<String> schemaTableNameList = new ArrayList<>();
- Map<String, String> splitConfig = config.getSplit();
- DriverConfig driverConfig = DriverConfig.build(cdcBuilder.parseMetaDataConfig());
- Driver driver = Driver.build(driverConfig);
- Map<String, Map<String, String>> allConfigMap = cdcBuilder.parseMetaDataConfigs();
- Set<Table> tables = driver.getSplitTables(tableRegList, splitConfig);
- for(Table table:tables){
- String schemaName = table.getSchema();
- Schema schema = Schema.build(schemaName);
- schema.setTables(Collections.singletonList(table));
- // 分库分表所有表结构都是一样的,取出列表中第一个表名即可
- String schemaTableName = table.getSchemaTableNameList().get(0);
- // 真实的表名
- String tableName = schemaTableName.split("\\.")[1];
- table.setColumns(driver.listColumnsSortByPK(schemaName, tableName));
- table.setColumns(driver.listColumnsSortByPK(schemaName, table.getName()));
- schemaList.add(schema);
- }
- for(String schemaName:schemaNameList){
- Schema schema = Schema.build(schemaName);
- if (!allConfigMap.containsKey(schemaName)) {
- continue;
- }
- Map<String, String> sink = config.getSink();
- Driver sinkDriver = DriverSchema.checkAndCreateSinkSchema(sink, schemaName);
-
- DriverConfig driverConfig1 = DriverConfig.build(allConfigMap.get(schemaName));
- Driver driver1 = Driver.build(driverConfig1);
- final List<Table> tables1 = driver1.listTables(schemaName);
- for (Table table : tables1) {
- if (!Asserts.isEquals(table.getType(), "VIEW")) {
- if (Asserts.isNotNullCollection(tableRegList)) {
- for (String tableReg : tableRegList) {
- if (table.getSchemaTableName().matches(tableReg.trim())
- && !schema.getTables()
- .contains(Table.build(table.getName()))) {
- table.setColumns(
- driver.listColumnsSortByPK(
- schemaName, table.getName()));
- schema.getTables().add(table);
- schemaTableNameList.add(table.getSchemaTableName());
- break;
- }
- }
- } else {
- table.setColumns(
- driver.listColumnsSortByPK(schemaName, table.getName()));
- schemaTableNameList.add(table.getSchemaTableName());
- schema.getTables().add(table);
- }
- }
- }
- if (null != sinkDriver) {
- for (Table table : schema.getTables()) {
- Table sinkTable = (Table) table.clone();
- sinkTable.setSchema(table.getSchema());
- sinkTable.setName(table.getName());
- checkAndCreateSinkTable(sinkDriver, sinkTable);
- }
- }
- schemaList.add(schema);
- }
- return schemaTableNameList;
- }
以上就是通过jdbc读取mysql的schema信息,插入到doris。
checkAndCreateSinkTable就是DDL语句插入到doris里面:
flinkcdc-mysql-doris的项目地址:https://gitee.com/jwandbj/flinkcdc-mysql-doris
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。