赞
踩
生产环境需求-sqlserver数据库数据迁移至mysql
链接: link https://gitee.com/mirrors/DataX.git
sqlserver2mysql.json
{ "job": { "content": [{ "reader": { "name": "sqlserverreader", "parameter": { "username": "", "password": "", "column": ["*"], "connection": [{ "table": ["$readTable"], "jdbcUrl": ["jdbc:sqlserver://127.0.0.1:1058;DatabaseName=database"] }], "fetchSize": 20000 } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "", "password": "", "writeMode": "insert", "column": ["*"], "connection": [{ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/database", "table": ["$writeTable"] }], "batchSize":20000, "ignoreTagsUnmatched": true } } }], "setting": { "speed": { "channel": "64" } } } }
其中 r e a d T a b l e 和 readTable 和 readTable和writeTable 就是为了自动化传入参数
# coding:utf-8 import time from concurrent.futures import ThreadPoolExecutor import os def transfer(table1, table2): print("开始同步表:", table1) n = os.system( 'python ' + 'D:\\javacode\\DataX\\target\\datax\\datax\\bin\\datax.py ' + 'D:\\javacode\\DataX\\target\\datax\\datax\\bin\\sqlserver2mysql.json -p \"-DreadTable=' + table1 + ' -DwriteTable=' + table2 + '\"') print("同步表结束:", table1) # 创建一个包含32条线程的线程池 pool = ThreadPoolExecutor(max_workers=16) # 定义需要同步的表格 table_list1 = ['[table1]', '[table2]'] table_list2 = ['`table1`', '`table2`'] all_task = [] task_dic = {} i = 0 while True: if len(all_task) > 16: break print("启动线程:", i) # 向线程池提交一个task task = pool.submit(transfer, table_list1[i], table_list2[i]) all_task.append(task) task_dic[task] = table_list1[i] i += 1 time.sleep(8) while True: run_task = [] for future in all_task: if future.running(): run_task.append(task_dic[future]) if i < len(table_list1): if len(run_task) < 16: print("启动线程:", i) task = pool.submit(transfer, table_list1[i], table_list2[i]) all_task.append(task) task_dic[task] = table_list1[i] i += 1 run_task = [] for future in all_task: if future.running(): run_task.append(task_dic[future]) if len(run_task) == 0: break print("当前运行任务数:", len(run_task)) print("当前正在同步表:", run_task) time.sleep(8) print("关闭线程池") pool.shutdown()
执行命令就是 python ceshi.py
package com.cyboil.logs; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * @author scf */ public class LogAnalysis { public static void main(String[] args) { List<LogDto> result = new ArrayList<>(); List<LogDto> failedList = new ArrayList<>(); String path = "D:\\项目资料\\数据迁移日志\\2023-02-02 - 副本"; getLog(path, result, failedList); System.out.println("-------------------------------------------result-----------------------------------------------"); System.out.println(result.size()); for (LogDto logDto : result) { System.out.println(logDto.toString()); } List<String> collect = result.stream().map(LogDto::getTable).collect(Collectors.toList()); String resultTables = String.join("','", collect); String replace = resultTables.replace("[", "`").replace("]", "`"); System.out.println(result.size()); System.out.println(resultTables); System.out.println(replace); System.out.println("-------------------------------------------failedList-----------------------------------------------"); System.out.println(failedList.size()); for (LogDto logDto : failedList) { System.out.println(logDto.toString()); } collect = failedList.stream().map(LogDto::getTable).collect(Collectors.toList()); String failedTables = String.join("','", collect); replace = failedTables.replace("[", "`").replace("]", "`"); System.out.println(failedList.size()); System.out.println(failedTables); System.out.println(replace); } // public static void main(String[] args) { // List<LogDto> result = new ArrayList<>(); // List<LogDto> failedList = new ArrayList<>(); // String path = "D:\\项目资料\\数据迁移日志\\2023-02-01"; // getLog(path, result, failedList); // //本次同步所有表 // List<String> allTableList = result.stream().map(LogDto::getTable).collect(Collectors.toList()); // System.out.println("-------------------------------------------BIG_TABLE-----------------------------------------------"); // List<LogDto> logDtoList = result.stream().filter(l -> l.getTimeConsuming() != null && l.getTimeConsuming() > 10800).collect(Collectors.toList()); // for (LogDto logDto : logDtoList) { // System.out.println(logDto.toString()); // } // List<String> bigTableList = logDtoList.stream().map(LogDto::getTable).collect(Collectors.toList()); // //获取耗时长的表 // String bigTables = String.join("','", bigTableList); // String replace = bigTables.replace("[", "`").replace("]", "`"); // System.out.println(bigTableList.size()); // System.out.println(bigTables); // System.out.println(replace); // //空结果表 // System.out.println("-------------------------------------------NULL_TABLE-----------------------------------------------"); // List<LogDto> nullTableList = result.stream().filter(l -> l.getTimeConsuming() == null).collect(Collectors.toList()); // for (LogDto logDto : nullTableList) { // System.out.println(logDto.toString()); // } // List<String> nullList = nullTableList.stream().map(LogDto::getTable).collect(Collectors.toList()); // String collect1 = String.join("','", nullList); // String replace1 = collect1.replace("[", "`").replace("]", "`"); // System.out.println(nullList.size()); // System.out.println(collect1); // System.out.println(replace1); // System.out.println("-------------------------------------------NORMAL_TABLE-----------------------------------------------"); // List<String> normalList = new ArrayList<>(); // //获取所有表 // for (String s : allTableList) { // if (!bigTableList.contains(s) && !nullList.contains(s)) { // normalList.add(s); // } // } // String normalTables = String.join("','", normalList); // replace = normalTables.replace("[", "`").replace("]", "`"); // System.out.println(normalList.size()); // System.out.println(normalTables); // System.out.println(replace); // //失败表 // System.out.println("------------------------------------------ FAILED_TABLE----------------------------------------------"); // String failedTables = failedList.stream().map(LogDto::getTable).collect(Collectors.joining("','")); // replace = failedTables.replace("[", "`").replace("]", "`"); // System.out.println(failedList.size()); // System.out.println(failedTables); // System.out.println(replace); // // } private static void getLog(String path, List<LogDto> result, List<LogDto> failedList) { String[] fileNames = getFileNames(path); for (String fileName : fileNames) { File file = new File(path + "\\" + fileName); BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); LogDto logDto = new LogDto(); logDto.setFileName(file.getName()); String tempStr; int i = 0; while ((tempStr = reader.readLine()) != null) { if (i == 41) { logDto.setTable(tempStr.trim().replace("\"", "")); } if (tempStr.contains("回滚此次写入")) { logDto.setIsSingleLine("1"); String trim = tempStr.substring(tempStr.indexOf("因为:") + 1).trim(); logDto.setBecause(trim); } else if (tempStr.contains("任务启动时刻")) { String startTime = tempStr.substring(tempStr.indexOf(":") + 1).trim(); logDto.setStartTime(startTime); } else if (tempStr.contains("任务结束时刻")) { String endTime = tempStr.substring(tempStr.indexOf(":") + 1).trim(); logDto.setEndTime(endTime); } else if (tempStr.contains("任务总计耗时")) { String timeConsuming = tempStr.split(":")[1].trim().replace("s", ""); logDto.setTimeConsuming(Long.parseLong(timeConsuming)); } else if (tempStr.contains("记录写入速度")) { String writeSpeed = tempStr.split(":")[1].trim(); logDto.setWriteSpeed(writeSpeed); } else if (tempStr.contains("读出记录总数")) { String total = tempStr.split(":")[1].trim(); logDto.setTotal(total); } else if (tempStr.contains("读写失败总数")) { String failed = tempStr.split(":")[1].trim(); logDto.setFailed(failed); } i++; } if ("1".equals(logDto.getIsSingleLine())) { failedList.add(logDto); } else { result.add(logDto); } reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { e1.printStackTrace(); } } } } } /** * 得到文件名称 * * @param path 路径 * @return {@link List}<{@link String}> */ private static String[] getFileNames(String path) { File file = new File(path); if (!file.exists()) { return null; } return file.list(); } private static String[] getTables() { String[] arr = {"[table1]"}; return arr; } private static String[] getForeignTables() { String[] arr = {"[table2]"}; return arr; } // public static void main(String[] args) { // List<String> noForeign = new ArrayList<>(); // List<String> foreign = new ArrayList<>(); // String[] allTables = getTables(); // List<String> foreignTables = Arrays.asList(getForeignTables()); // for (String table : allTables) { // if (foreignTables.contains(table)) { // foreign.add(table); // } else { // noForeign.add(table); // } // } // String collect = String.join("','", noForeign); // String replace = collect.replace("[", "`").replace("]", "`"); // System.out.println(noForeign.size()); // System.out.println(collect); // System.out.println(replace); // System.out.println("------------------------------------------------------------------------------------------"); // System.out.println(foreign.size()); // collect = String.join("','", foreign); // replace = collect.replace("[", "`").replace("]", "`"); // System.out.println(collect); // System.out.println(replace); // // } }
创建实体类对象 LogDto
package com.cyboil.logs; /** * @author scf */ public class LogDto { /** * 表格 */ private String table; /** * 是一行 */ private String isSingleLine = "0"; /** * 任务启动时刻 */ private String startTime; /** * 任务结束时刻 */ private String endTime; /** * 任务总计耗时 */ private Long timeConsuming; /** * 记录写入速度 */ private String writeSpeed; /** * 读出记录总数 */ private String total; /** * 读写失败总数 */ private String failed; /** * 原因 */ private String because; /** * 文件名称 */ private String fileName; public String getBecause() { return because; } public void setBecause(String because) { this.because = because; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } public String getIsSingleLine() { return isSingleLine; } public void setIsSingleLine(String isSingleLine) { this.isSingleLine = isSingleLine; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public Long getTimeConsuming() { return timeConsuming; } public void setTimeConsuming(Long timeConsuming) { this.timeConsuming = timeConsuming; } public String getWriteSpeed() { return writeSpeed; } public void setWriteSpeed(String writeSpeed) { this.writeSpeed = writeSpeed; } public String getTotal() { return total; } public void setTotal(String total) { this.total = total; } public String getFailed() { return failed; } public void setFailed(String failed) { this.failed = failed; } @Override public String toString() { return complement(table, 35) + complement(isSingleLine, 5) + complement(startTime, 24) + complement(endTime, 24) + complement(String.valueOf(timeConsuming), 12) + complement(writeSpeed, 18) + complement(total, 12) + complement(failed, 12) + complement(fileName, 40) + complement(because, 100); } private String complement(String str, int i) { if (str == null) { return "null" + getSpace(i - "null".length()); } else { return str + getSpace(i - str.length()); } } private String getSpace(int i) { StringBuilder space = new StringBuilder(); for (int j = 0; j < i; j++) { space.append(" "); } return space.toString(); } }
本次数据迁移涉及表达到亿级数据,只是时间上不是很友好,如果服务器允许多开线程可以提高些许速度
mysql查询外键的表sql
select *
from information_schema.KEY_COLUMN_USAGE
where TABLE_SCHEMA = '数据库名称'
and REFERENCED_TABLE_NAME != '';
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。