当前位置:   article > 正文

mysql建表语句转为doris建表语句_mysql转doris

mysql转doris

mysql同步到doris建表语句批量生成

适配doris语法,自动选择key,替换顺序,类型转换,选择数据模型等

一、生成配置文件并导出

配置文件内容格式

源库源表doris库dors表备注
testorderods_devods_order_df订单表
testuserods_devods_user_df用户表
  1. test,order,ods_dev,ods_order_df,订单表
  2. test,user,ods_dev,ods_user_df,用户表

二、执行mysql_to_doris.py脚本

  1. import pymysql
  2. class ColumnEntity:
  3. def __init__(self, column_name, ordinal_position, data_type, character_maximum_length, column_type, column_key,
  4. column_comment):
  5. self.column_name = column_name
  6. self.ordinal_position = ordinal_position
  7. self.data_type = data_type
  8. self.character_maximum_length = character_maximum_length
  9. self.column_type = column_type
  10. self.column_key = column_key
  11. self.column_comment = column_comment
  12. class TableInfoEntity:
  13. def __init__(self, mysql_db, mysql_table, doris_db, doris_table, comment):
  14. self.mysql_db = mysql_db
  15. self.mysql_table = mysql_table
  16. self.doris_db = doris_db
  17. self.doris_table = doris_table
  18. self.comment = comment
  19. def info_config():
  20. info_map = {}
  21. file = open('C:\\test\\test.txt', mode='r', encoding='utf-8')
  22. for line in file.readlines():
  23. list = line.strip('\n').split(',')
  24. mysql_db = list[0]
  25. mysql_table = list[1]
  26. doris_db = list[2]
  27. doris_table = list[3]
  28. comment = list[4]
  29. key = mysql_db + '.' + mysql_table
  30. table_info_entity = TableInfoEntity(mysql_db, mysql_table, doris_db, doris_table, comment)
  31. info_map[key] = table_info_entity
  32. # 关闭文件
  33. file.close()
  34. return info_map
  35. def table_column_info():
  36. table_map = {}
  37. table_schema = "('test')" # 要查询的库,多个逗号切分
  38. connection = pymysql.connect(host='localhost', port=3306, user='root',
  39. passwd='123456')
  40. cursor = connection.cursor()
  41. sql = ("select table_schema,table_name,column_name,ordinal_position,data_type,character_maximum_length,column_type,"
  42. "column_key,column_comment from information_schema.columns where table_schema in {}").format(table_schema)
  43. cursor.execute(sql)
  44. table_info = cursor.fetchall()
  45. for tuple in table_info:
  46. key = tuple[0] + "." + tuple[1]
  47. column_entity = ColumnEntity(tuple[2], tuple[3], tuple[4], tuple[5], tuple[6], tuple[7], tuple[8])
  48. if table_map.__contains__(key):
  49. values = table_map[key]
  50. values.append(column_entity)
  51. else:
  52. list = []
  53. list.append(column_entity)
  54. table_map[key] = list
  55. # 关闭连接
  56. cursor.close()
  57. connection.close()
  58. return table_map
  59. def mysql_type_convert(data_type, character_maximum_length, column_type):
  60. # 长度小于100 增加6倍,大于100增加3倍
  61. if data_type.__eq__('char') or data_type.__eq__('varchar'):
  62. character_maximum_length = character_maximum_length * 6 if character_maximum_length < 100 else character_maximum_length * 3
  63. if character_maximum_length > 65533: character_maximum_length = 65530
  64. data_type = ('char({})'.format(character_maximum_length)) if data_type.__eq__('char') else (
  65. 'varchar({})'.format(character_maximum_length))
  66. # 这两个字段有精度要求
  67. if data_type.__eq__('datetime') or data_type.__eq__('decimal'): data_type = column_type
  68. # 特殊类型替换 为了兼容doris
  69. s = 'string'
  70. data_type = (data_type.replace('tinytext', s).replace('mediumtext', s).replace('longtext', s)
  71. .replace('tinyblob', s).replace('blob', s).replace('mediumblob', s).replace('longblob', s)
  72. .replace('tinystring', s).replace('mediumstring', s).replace('longstring', s)
  73. .replace('timestamp', 'datetime').replace('enum', s).replace('set', s)
  74. .replace('varbinary', s).replace('binary', s).replace('mediumint', 'int')
  75. .replace('year', 'varchar(64)').replace('bit', 'char(10)'))
  76. if data_type.__eq__('time'): data_type = 'varchar(64)'
  77. if data_type.__eq__('text'): data_type = s
  78. return data_type
  79. def batch_mysql_to_doris(info_map, table_map):
  80. out_file = open('C:\\test\\doris_create.sql', mode='a')
  81. for key, info_entity in info_map.items():
  82. doris_db = info_entity.doris_db
  83. doris_table = info_entity.doris_table
  84. comment = info_entity.comment
  85. if table_map.__contains__(key):
  86. column_list = table_map[key]
  87. head = 'create table if not exists {}.{} ('.format(doris_db, doris_table)
  88. body = []
  89. end = []
  90. pri_list = []
  91. first_column_name = '`' + column_list[0].column_name + '`' # 当前表的第一个字段
  92. for column_entity in column_list:
  93. column_name = '`' + column_entity.column_name + '`'
  94. data_type = column_entity.data_type
  95. character_maximum_length = column_entity.character_maximum_length
  96. column_type = column_entity.column_type
  97. column_key = column_entity.column_key
  98. column_comment = "'" + column_entity.column_comment + "'"
  99. # 类型转换,兼容doris
  100. data_type = mysql_type_convert(data_type, character_maximum_length, column_type)
  101. # 拼接字段
  102. value = column_name + ' ' + data_type + ' ' + 'comment ' + column_comment + ','
  103. # 如果当前字段是主键,就调整顺序
  104. if column_key.__eq__('PRI'):
  105. body.insert(0, value)
  106. if len(pri_list) > 0:
  107. pri_list.insert(0, column_name)
  108. else:
  109. pri_list.append(column_name)
  110. else:
  111. body.append(value)
  112. # 增加两个字段
  113. body.append("data_source varchar(500) comment '数据来源',")
  114. body.append("insert_time datetime comment '数据插入时间'")
  115. # 如果有主键就使用 unique模型,如果没有主键就使用duplicate模型,默认第一个字段当作key
  116. # 可自定义添加相关属性
  117. if len(pri_list) > 0:
  118. unique_key = ','.join(pri_list)
  119. end.append("unique key({})".format(unique_key))
  120. end.append('comment "{}"'.format(comment))
  121. end.append('distributed by hash({}) buckets 10;'.format(unique_key))
  122. else:
  123. end.append("duplicate key({})".format(first_column_name))
  124. end.append('comment "{}"'.format(comment))
  125. end.append('distributed by hash({}) buckets 10;'.format(first_column_name))
  126. # print("当前表无主键,使用duplicate模型,默认第一个字段当作key 库名:{} 表名:{}".format(doris_db, doris_table))
  127. print("truncate table " + doris_db + "." + doris_table + ";")
  128. # 拼接整体的建表语句
  129. create_sql = head + '\n' + '\n'.join(body) + '\n)\n' + '\n'.join(end) + '\n'
  130. # print("create_ddl:{}".format(create_ddl))
  131. # 写入文件
  132. out_file.write(create_sql)
  133. else:
  134. print("配置文件有问题,获取不到对应的表 key:{}".format(key))
  135. # 关闭结果文件
  136. out_file.close()
  137. if __name__ == '__main__':
  138. # 读取表信息配置文件
  139. info_map = info_config()
  140. # 读取mysql获取表的column
  141. table_map = table_column_info()
  142. # 生成doris建表语句
  143. batch_mysql_to_doris(info_map, table_map)

三、执行结果

会在本地生成doris建表语句的文件

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/973199
推荐阅读
相关标签
  

闽ICP备14008679号