赞
踩
最近公司有个需求,要用etl迁移几十上百个表的数据到目标数据库,
由于不是直接照搬过去,而是要对字段进行一些修改和添加,
故需要用etl脚本来进行数据的迁移,通过了解,我使用了kettle可视化工具进行了etl脚本的书写。
下面是我的一些使用体验。 这次的任务量比较的大,
由于博主本人是一枚21届的毕业生,初入Java开发这个行业,更是第一次接触到etl这个东西,
所以写的不好的地方请多多见谅,谢谢大家。 首先数据迁移我这边是需要三个表,
一个源表,一个目标表,还有一个记录时间戳的表
首先创建一个转换,分别是两个表输入和一个插入更新。第一步从时间戳表获取到上次记录的时间,然后在第二步查询的时候大于这个时间,这样就实现了增量更新所有的数据,然后执行插入更新操作就可以了。
时间戳的转换就是首先去目标表里面查询当前数据最大的时间,然后记录到时间戳表
作业就是这样
由于有很多作业,所以就弄了这样一个并发
开启这个run next 就可以开启并发了
如果数据量比较大,这个记录数可以调大一点,当然要注意自己分配的内存,设置太大了可能就oom了。。不要问我为什么知道哈哈
如果迁移百万千万级数据的话这个提交记录数量也可以调大一点,最大可以调到50000,然后插入更新的话可以选查询的关键字,我是用的id作为标识,然后设置id更新为N这样它的id就不会变,如果一样就只会执行更新操作了。
kettlle迁移数据库还是比较简单的,没什么好多说的,
可能最大的影响就是效率问题,可以优化的地方比较多,我这里就不做赘述了,有需求的可以去搜一搜教程。
这篇文章主要讲的是数据迁移的思路和如果自动生成这个脚本。
下面我就来说说如何用Java代码来自动生成这些脚本,当时做的时候还是比较费力的,
网上的教程能搜到的比较少,所以分享一些自己的经验出来,避免大家走弯路。
private DatabaseMeta xml2DatabaseMeta(DateBaseInfo dateBaseInfo) throws KettleXMLException {
String args = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>%s</name>" +
"<server>%s</server>" +
"<type>%s</type>" +
"<access>%s</access>" +
"<database>%s</database>" +
"<port>%s</port>" +
"<username>%s</username>" +
"<password>%s</password>" +
"</connection>";
return new DatabaseMeta(String.format(args, dateBaseInfo.getName(), dateBaseInfo.getServer(), dateBaseInfo.getType(), dateBaseInfo.getAccess(),
dateBaseInfo.getDatabase(), dateBaseInfo.getPort(), dateBaseInfo.getUserName(), dateBaseInfo.getPassword()));
}
这个是数据库连接的代码
public StepMeta getTableInput(DatabaseMeta dataBase, String sql, String stepName, int x, int y) {
TableInputMeta tableInput = new TableInputMeta();
tableInput.setDefault();
tableInput.setDatabaseMeta(dataBase);
tableInput.setSQL(sql);
tableInput.setVariableReplacementActive(true);
StepMeta stepMeta = new StepMeta("TableInput", stepName, tableInput);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
生成表输入
public StepMeta getInsertUpdate(DatabaseMeta dataBase, String tableName, String[] keyLookup, String[] keyStream,String[] keyStream2, String[] keyCondition, String[] updatelookup, String[] updateStream, Boolean[] updateOrNot, String stepName, int x, int y) { InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); insertUpdateMeta.setDefault(); insertUpdateMeta.setCommitSize("20000"); insertUpdateMeta.setDatabaseMeta(dataBase); insertUpdateMeta.setTableName(tableName); //设置用来查询的关键字 insertUpdateMeta.setKeyLookup(keyLookup); insertUpdateMeta.setKeyStream(keyStream); insertUpdateMeta.setKeyStream2(keyStream2);//一定要加上 insertUpdateMeta.setKeyCondition(keyCondition); //设置要更新的字段 insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); String[] lookup = insertUpdateMeta.getUpdateLookup(); //添加步骤到转换中 StepMeta stepMeta = new StepMeta("InsertUpdate", stepName, insertUpdateMeta); stepMeta.setDraw(true); stepMeta.setLocation(x, y); return stepMeta; }
插入更新
public StepMeta getMqtt(String server, String userName, String passWord, String clientId, String topic, String stepName, int x, int y) {
MQTTProducerMetaV2 mqttProducerMetaV2 = MQTTProducerMetaV2.builder()
.mqttServer(server)
.username(userName)
.password(passWord)
.clientId(clientId)
.topic(topic).build();
mqttProducerMetaV2.setMessageField("outputValue");
StepMeta stepMeta = new StepMeta("MQTTProducer", stepName, mqttProducerMetaV2);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
mqtt ,需要注意的一点是他生成mqtt的源代码类MQTTProducerMeta 不支持有的数据的改变,所以我把它拿出来自己进行了一层封装,让他支持修改一些属性。
public StepMeta getJsonOutPut(JsonOutputField[] jsonOutputFields, String stepName, int x, int y) {
JsonOutputMeta jsonOutputMeta = new JsonOutputMeta();
jsonOutputMeta.setDefault();
jsonOutputMeta.setOperationType(jsonOutputMeta.OPERATION_TYPE_OUTPUT_VALUE);
jsonOutputMeta.setNrRowsInBloc("100000000");
jsonOutputMeta.setOutputFields(jsonOutputFields);
StepMeta stepMeta = new StepMeta("JsonOutput", stepName, jsonOutputMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
json转换
public StepMeta getsetVariable(String[] variablefieldNames, String[] variableVariabeldNames, int[] variableTypes, String stepName, int x, int y) {
SetVariableMeta setVariableMeta = new SetVariableMeta();
setVariableMeta.setDefault();
setVariableMeta.setFieldName(variablefieldNames);
setVariableMeta.setVariableName(variableVariabeldNames);
setVariableMeta.setVariableType(variableTypes);
setVariableMeta.setDefaultValue(new String[]{"", ""});
StepMeta stepMeta = new StepMeta("SetVariable", stepName, setVariableMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
这个是设置变量,将变量设置成什么格式,生效范围之类的
public StepMeta getSelectVaues(String feild, String dataType, String stepName, int x, int y) {
SelectValuesMeta selectValuesMeta = new SelectValuesMeta();
SelectMetadataChange selectMetadataChange = new SelectMetadataChange(selectValuesMeta, feild, null,
ValueMetaInteger.TYPE_STRING, -2, -2, -1, dataType, null, null, null);
SelectMetadataChange[] selectMetadataChanges = {selectMetadataChange};
selectValuesMeta.setMeta(selectMetadataChanges);
StepMeta stepMeta = new StepMeta("SelectValues", stepName, selectValuesMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
这个是系统时间转换,一个参数传的是变量,一个传的是转化成什么类型
public StepMeta getSystemData(String[] fieldNames, SystemDataTypes[] fieldTypes, String stepName, int x, int y) {
SystemDataMeta systemDataMeta = new SystemDataMeta();
systemDataMeta.setFieldName(fieldNames);
systemDataMeta.setFieldType(fieldTypes);
StepMeta stepMeta = new StepMeta("SystemInfo", stepName, systemDataMeta);
stepMeta.setDraw(true);
stepMeta.setLocation(x, y);
return stepMeta;
}
获取系统时间,我这里获取到系统当前时间, fieldNames传的是{“NOW”},fieldTypes传的是{SystemDataTypes.TYPE_SYSTEM_INFO_SYSTEM_DATE}
private JobMeta generateJobs(String transFileName1, String transFileName2) { JobMeta jobMeta = new JobMeta(); jobMeta.setName(transFileName1+"_job"); //开始按钮 JobEntrySpecial jobEntrySpecial = new JobEntrySpecial(); jobEntrySpecial.setStart(true); jobEntrySpecial.setName("start"); //可以配置定时功能 jobEntrySpecial.setRepeat(true); jobEntrySpecial.setSchedulerType(JobEntrySpecial.DAILY); jobEntrySpecial.setHour(1); JobEntryCopy specialCopy = new JobEntryCopy(jobEntrySpecial); specialCopy.setDrawn(true); specialCopy.setLocation(100, 300); jobMeta.addJobEntry(specialCopy); //获取时间转换 JobEntryTrans dateEntryTrans = new JobEntryTrans(); dateEntryTrans.setName("date_migration"); dateEntryTrans.setFileName("${Internal.Entry.Current.Directory}/" + transFileName2); JobEntryCopy datetransCopy = new JobEntryCopy(dateEntryTrans); datetransCopy.setDrawn(true); datetransCopy.setLocation(300, 300); jobMeta.addJobEntry(datetransCopy); //获取数据迁移转换 JobEntryTrans jobEntryTrans = new JobEntryTrans(); jobEntryTrans.setName("data_migration"); jobEntryTrans.setFileName("${Internal.Entry.Current.Directory}/" + transFileName1); JobEntryCopy transCopy = new JobEntryCopy(jobEntryTrans); transCopy.setDrawn(true); transCopy.setLocation(400, 100); jobMeta.addJobEntry(transCopy); //数据校验 JobEntrySimpleEval jobEntrySimpleEval = new JobEntrySimpleEval(); jobEntrySimpleEval.setName("检验字段的值"); jobEntrySimpleEval.valuetype = 1; jobEntrySimpleEval.setVariableName(etlDateIntegrationTransConfig.getTimeTableField()); jobEntrySimpleEval.setFieldName(etlDateIntegrationTransConfig.getTimeTableField()); jobEntrySimpleEval.setCompareValue("${NOW}"); jobEntrySimpleEval.successcondition = 5; JobEntryCopy simpleCopy = new JobEntryCopy(jobEntrySimpleEval); simpleCopy.setDrawn(true); simpleCopy.setLocation(500, 300); jobMeta.addJobEntry(simpleCopy); //成功 JobEntrySuccess jobEntrySuccess = new JobEntrySuccess(); jobEntrySuccess.setName("成功"); JobEntryCopy successCopy = new JobEntryCopy(jobEntrySuccess); successCopy.setLocation(700, 300); successCopy.setDrawn(true); jobMeta.addJobEntry(successCopy); //将job组件连接 jobMeta.addJobHop(new JobHopMeta(specialCopy, datetransCopy)); jobMeta.addJobHop(new JobHopMeta(datetransCopy, simpleCopy)); jobMeta.addJobHop(new JobHopMeta(simpleCopy, transCopy)); jobMeta.addJobHop(new JobHopMeta(transCopy, datetransCopy)); jobMeta.addJobHop(new JobHopMeta(simpleCopy, successCopy)); return jobMeta; }
这些是生成作业的代码
最后来看看效果吧,
这个就是这些代码自动生成的全部脚本了。第一次做这个,不太熟练,希望能得到你们宝贵的意见,谢谢。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。