赞
踩
要做大量的datax来同步mysql-doris,会需要写很多datax的json文件,为了省事,写了工具,只要提供doris的建表语句即可生产json。
一共用到了五个文档
这里是Mysql和doris的链接信息
其中 table_prefix 字段是因为mysql到doris时表会加前缀,如果你是同名表 就用不到。
{ "mysql": { "host": "", "port": 3306, "user": "root", "password": "", "database": "", "table_prefix": "" }, "doris": { "host": "", "port": 9030, "user": "", "password": "", "loadUrl": ["xxx:8030"], "preSql": [] } }
这里是一个标准的json文件,生产是会用这个做模版修改成你想要的json
如果你想提高速度,可以在这里修改channel
其他关于模版的修改 也在这里改。
{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "", "password": "", "splitPk": "", "connection": [ { "querySql": [], "jdbcUrl": [] } ] } }, "writer": { "name": "doriswriter", "parameter": { "username": "", "password": "", "loadUrl": [ ], "TwoPhaseCommit": "true", "column": [], "preSql": [], "flushInterval": 30000, "connection": [ { "table": [ "" ], "jdbcUrl": "", "selectedDatabase": "" } ], "loadProps": { "format": "json", "strip_outer_array": "true", "line_delimiter": "\\x02" } } } } ] } }
这是最终生成的json文件,不需要提前创建
这里是doris的建表语句。需要注意的是,请带上库名,datax json需要写库名,是从这里解析的。
CREATE TABLE `test`.`merchant` (
`merchant_code` VARCHAR(60) NOT NULL COMMENT '商户编码',
`merchant_name` VARCHAR(100) DEFAULT NULL COMMENT '商户名称',
`merchant_type` VARCHAR(60) DEFAULT NULL COMMENT '商户类型'
) ENGINE = OLAP
UNIQUE KEY(`merchant_code`)
DISTRIBUTED BY HASH(`merchant_code`) BUCKETS 20
PROPERTIES (
"replication_num" = "1",
"storage_type" = "COLUMN"
);
这就是核心代码了
import json import re # 读取JSON文件 def read_and_format_json(file_path): # 读取文件内容 with open(file_path, 'r', encoding='utf-8') as file: content = file.read() # 将内容转换为Python字典 return json.loads(content) # 解析DDL def parse_create_table_sql(file): sql = '' with open(file, 'r', encoding='utf-8') as file: sql = file.read() # 移除SQL中的注释 sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE) sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL) # 提取库名和表名 table_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:(`?)(\w+)\1\.)?(`?)(\w+)\3' table_match = re.search(table_pattern, sql, re.IGNORECASE) if not table_match: return None database_name = table_match.group(2) table_name = table_match.group(4) # 提取字段定义部分 column_section = re.search(r'\((.*?)\)[^)]*$', sql, re.DOTALL) if not column_section: return None # 提取字段名 column_pattern = r'`?(\w+)`?\s+(?:\w+)(?:\(.*?\))?(?:\s+.*?)?(?:,|$)' columns = re.findall(column_pattern, column_section.group(1)) return { 'database': database_name, 'table': table_name, 'columns': columns } # def get_select() def get_select(columns,table_name,tablename_prefix): columns = [s.strip() for s in columns] table_name = table_name.replace(tablename_prefix,'') return 'SELECT ' +'`'+ '`,`'.join(columns) +'`'+ ' FROM ' + table_name def get_column(columns): return ['`' + c + '`' for c in columns] conf = read_and_format_json('conf.json') js_demo = read_and_format_json('datax_example.json') ddl_info = parse_create_table_sql('dorisDDL.sql') select_sql = get_select(ddl_info['columns'],ddl_info['table'],conf['mysql']['table_prefix']) column = get_column(ddl_info['columns']) # reader部分 js_demo['job']['content'][0]['reader']['parameter']['username'] = conf['mysql']['user'] js_demo['job']['content'][0]['reader']['parameter']['password'] = conf['mysql']['password'] js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['querySql'] = [select_sql] jdbc_url_mysql = 'jdbc:mysql://'+conf['mysql']['host']+':'+str(conf['mysql']['port'])+'/'+conf['mysql']['database']+'?useSSL=false&serverTimezone=Asia/Shanghai' js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['jdbcUrl'] = [jdbc_url_mysql] # writer部分 js_demo['job']['content'][0]['writer']['parameter']['username'] = conf['doris']['user'] js_demo['job']['content'][0]['writer']['parameter']['password'] = conf['doris']['password'] js_demo['job']['content'][0]['writer']['parameter']['loadUrl'] = conf['doris']['loadUrl'] js_demo['job']['content'][0]['writer']['parameter']['column'] = column js_demo['job']['content'][0]['writer']['parameter']['preSql'] = conf['doris']['preSql'] js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['table'] = [ddl_info['table']] jdbc_url_doris = 'jdbc:mysql://'+conf['doris']['host']+':'+str(conf['doris']['port'])+'/'+ddl_info['database'] js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['jdbcUrl'] = jdbc_url_doris js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['selectedDatabase'] = ddl_info['database'] print(json.dumps(js_demo)) # 写入JSON文件 with open('datax_output.json', 'w', encoding='utf-8') as file: json.dump(js_demo, file, ensure_ascii=False, indent=4)
只要把以上文件放在一个目录里,直接执行Mysql2dorisDataxTools.py 就可以了。
如果你懒得写,直接用下面的压缩包吧。
https://download.csdn.net/download/weixin_45399602/89644991?spm=1001.2014.3001.5503
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。