当前位置:   article > 正文

python工具--mysql2doris的datax json生成工具_datax mysql doris

datax mysql doris

一、说明

要做大量的datax来同步mysql-doris,会需要写很多datax的json文件,为了省事,写了工具,只要提供doris的建表语句即可生产json。

二、文件说明

一共用到了五个文档

2.1 conf.json

这里是Mysql和doris的链接信息
其中 table_prefix 字段是因为mysql到doris时表会加前缀,如果你是同名表 就用不到。

{
  "mysql": {
    "host": "",
    "port": 3306,
    "user": "root",
    "password": "",
    "database": "",
    "table_prefix": ""
  },
  "doris": {
    "host": "",
    "port": 9030,
    "user": "",
    "password": "",
    "loadUrl": ["xxx:8030"],
    "preSql": []
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

2.2 datax_example.json

这里是一个标准的json文件,生产是会用这个做模版修改成你想要的json
如果你想提高速度,可以在这里修改channel
其他关于模版的修改 也在这里改。


{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "",
                        "password": "",
                        "splitPk": "",
                        "connection": [
                            {
                                "querySql": [],
                                "jdbcUrl": []
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "username": "",
                        "password": "",
                        "loadUrl": [

                        ],
                        "TwoPhaseCommit": "true",
                        "column": [],
                        "preSql": [],
                        "flushInterval": 30000,
                        "connection": [
                            {
                                "table": [
                                    ""
                                ],
                                "jdbcUrl": "",
                                "selectedDatabase": ""
                            }
                        ],
                        "loadProps": {
                            "format": "json",
                            "strip_outer_array": "true",
                            "line_delimiter": "\\x02"
                        }
                    }
                }
            }
        ]
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

2.3 datax_output.json

这是最终生成的json文件,不需要提前创建

2.4 dorisDDL.sql

这里是doris的建表语句。需要注意的是,请带上库名,datax json需要写库名,是从这里解析的。

CREATE TABLE `test`.`merchant` (
  `merchant_code` VARCHAR(60) NOT NULL COMMENT '商户编码',
  `merchant_name` VARCHAR(100) DEFAULT NULL COMMENT '商户名称',
  `merchant_type` VARCHAR(60) DEFAULT NULL COMMENT '商户类型'
) ENGINE = OLAP
UNIQUE KEY(`merchant_code`)
DISTRIBUTED BY HASH(`merchant_code`)   BUCKETS 20
PROPERTIES (
  "replication_num" = "1",
  "storage_type" = "COLUMN"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.5 Mysql2dorisDataxTools.py

这就是核心代码了

import json
import re

# 读取JSON文件
def read_and_format_json(file_path):
    # 读取文件内容
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    # 将内容转换为Python字典
    return json.loads(content)



# 解析DDL
def parse_create_table_sql(file):
    sql = ''
    with open(file, 'r', encoding='utf-8') as file:
        sql = file.read()
        # 移除SQL中的注释
    sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)
    sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)

    # 提取库名和表名
    table_pattern = r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(?:(`?)(\w+)\1\.)?(`?)(\w+)\3'
    table_match = re.search(table_pattern, sql, re.IGNORECASE)

    if not table_match:
        return None

    database_name = table_match.group(2)
    table_name = table_match.group(4)

    # 提取字段定义部分
    column_section = re.search(r'\((.*?)\)[^)]*$', sql, re.DOTALL)
    if not column_section:
        return None

    # 提取字段名
    column_pattern = r'`?(\w+)`?\s+(?:\w+)(?:\(.*?\))?(?:\s+.*?)?(?:,|$)'
    columns = re.findall(column_pattern, column_section.group(1))

    return {
        'database': database_name,
        'table': table_name,
        'columns': columns
    }

# def get_select()
def get_select(columns,table_name,tablename_prefix):
    columns = [s.strip() for s in columns]
    table_name = table_name.replace(tablename_prefix,'')
    return 'SELECT  ' +'`'+ '`,`'.join(columns) +'`'+ ' FROM ' + table_name

def get_column(columns):
    return ['`' + c + '`' for c in columns]


conf = read_and_format_json('conf.json')
js_demo = read_and_format_json('datax_example.json')
ddl_info = parse_create_table_sql('dorisDDL.sql')
select_sql = get_select(ddl_info['columns'],ddl_info['table'],conf['mysql']['table_prefix'])
column = get_column(ddl_info['columns'])

# reader部分
js_demo['job']['content'][0]['reader']['parameter']['username'] = conf['mysql']['user']
js_demo['job']['content'][0]['reader']['parameter']['password'] = conf['mysql']['password']
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['querySql'] = [select_sql]
jdbc_url_mysql = 'jdbc:mysql://'+conf['mysql']['host']+':'+str(conf['mysql']['port'])+'/'+conf['mysql']['database']+'?useSSL=false&serverTimezone=Asia/Shanghai'
js_demo['job']['content'][0]['reader']['parameter']['connection'][0]['jdbcUrl'] = [jdbc_url_mysql]

# writer部分
js_demo['job']['content'][0]['writer']['parameter']['username'] = conf['doris']['user']
js_demo['job']['content'][0]['writer']['parameter']['password'] = conf['doris']['password']
js_demo['job']['content'][0]['writer']['parameter']['loadUrl'] = conf['doris']['loadUrl']
js_demo['job']['content'][0]['writer']['parameter']['column'] = column
js_demo['job']['content'][0]['writer']['parameter']['preSql'] = conf['doris']['preSql']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['table'] = [ddl_info['table']]
jdbc_url_doris = 'jdbc:mysql://'+conf['doris']['host']+':'+str(conf['doris']['port'])+'/'+ddl_info['database']
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['jdbcUrl'] = jdbc_url_doris
js_demo['job']['content'][0]['writer']['parameter']['connection'][0]['selectedDatabase'] = ddl_info['database']

print(json.dumps(js_demo))

# 写入JSON文件
with open('datax_output.json', 'w', encoding='utf-8') as file:
    json.dump(js_demo, file, ensure_ascii=False, indent=4)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

三、使用

只要把以上文件放在一个目录里,直接执行Mysql2dorisDataxTools.py 就可以了。

如果你懒得写,直接用下面的压缩包吧。

https://download.csdn.net/download/weixin_45399602/89644991?spm=1001.2014.3001.5503

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/1018492
推荐阅读
相关标签
  

闽ICP备14008679号