赞
踩
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
com.alibaba.datax.core.Engine
参数加载、容器创建、容器启动
public void start(Configuration allConf) { // 绑定column转换信息 ColumnCast.bind(allConf); /** * 初始化PluginLoader,可以获取各种插件配置 */ LoadUtil.bind(allConf); boolean isJob = !("taskGroup".equalsIgnoreCase(allConf .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL))); //JobContainer会在schedule后再行进行设置和调整值 int channelNumber =0; AbstractContainer container; long instanceId; int taskGroupId = -1; if (isJob) { // 如果是作业,创建作业容器 allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE); container = new JobContainer(allConf); instanceId = allConf.getLong( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0); } else { // 如果不是作业容器,创建作业组容器 container = new TaskGroupContainer(allConf); instanceId = allConf.getLong( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID); taskGroupId = allConf.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID); channelNumber = allConf.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL); } //缺省打开perfTrace boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true); boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true); //standalone模式的 datax shell任务不进行汇报 if(instanceId == -1){ perfReportEnable = false; } Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable); perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); container.start(); }
容器启动的执行过程
@Override public void start() { LOG.info("DataX jobContainer starts job."); boolean hasException = false; boolean isDryRun = false; try { this.startTimeStamp = System.currentTimeMillis(); isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false); if(isDryRun) { LOG.info("jobContainer starts to do preCheck ..."); this.preCheck(); } else { userConf = configuration.clone(); LOG.debug("jobContainer starts to do preHandle ..."); //Job 前置操作 this.preHandle(); LOG.debug("jobContainer starts to do init ..."); //初始化 reader 和 writer this.init(); LOG.info("jobContainer starts to do prepare ..."); //全局准备工作,比如 odpswriter 清空目标表 this.prepare(); LOG.info("jobContainer starts to do split ..."); // 拆分task是重点要看的 this.totalStage = this.split(); LOG.info("jobContainer starts to do schedule ..."); // 调度是重点要看的 this.schedule(); LOG.debug("jobContainer starts to do post ..."); this.post(); LOG.debug("jobContainer starts to do postHandle ..."); this.postHandle(); LOG.info("DataX jobId [{}] completed successfully.", this.jobId); this.invokeHooks(); } } catch (Throwable e) { ... } finally { ... } }
private int split() { // 调整channel数量 this.adjustChannelNumber(); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } // 切分逻辑:读和写的数量必须要对应。自己写插件的时候需要注意 List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); int taskNumber = readerTaskConfigs.size(); List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); /** * 输入是reader和writer的parameter list,输出是content下面元素的list */ List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs( readerTaskConfigs, writerTaskConfigs, transformerList); LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); return contentConfig.size(); }
private void adjustChannelNumber() { int needChannelNumberByByte = Integer.MAX_VALUE; int needChannelNumberByRecord = Integer.MAX_VALUE; // 每秒传输的字节数的上限 // 配置在json文件的 job.setting.speed.byte boolean isByteLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0); if (isByteLimit) { long globalLimitedByteSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024); // 在byte流控情况下,单个Channel流量最大值必须设置,否则报错! Long channelLimitedByteSpeed = this.configuration .getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE); if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) { throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, "在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数"); } needChannelNumberByByte = (int) (globalLimitedByteSpeed / channelLimitedByteSpeed); needChannelNumberByByte = needChannelNumberByByte > 0 ? needChannelNumberByByte : 1; LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes."); } //这个参数用于设置总TPS(记录每秒)限速。 // 配置在json文件的 job.setting.speed.record boolean isRecordLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0; if (isRecordLimit) { long globalLimitedRecordSpeed = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000); Long channelLimitedRecordSpeed = this.configuration.getLong( CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD); if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) { throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数"); } needChannelNumberByRecord = (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed); needChannelNumberByRecord = needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1; LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records."); } // 取较小值 this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ? needChannelNumberByByte : needChannelNumberByRecord; // 如果从byte或record上设置了needChannelNumber则退出 if (this.needChannelNumber < Integer.MAX_VALUE) { return; } // 这个参数用于设置DataX Job内Channel的并发数。 // 配置在json文件的 job.setting.speed.channel boolean isChannelLimit = (this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0); if (isChannelLimit) { this.needChannelNumber = this.configuration.getInt( CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL); LOG.info("Job set Channel-Number to " + this.needChannelNumber + " channels."); return; } throw DataXException.asDataXException( FrameworkErrorCode.CONFIG_ERROR, "Job运行速度必须设置"); }
private void schedule() { /** * 这里的全局speed和每个channel的速度设置为B/s */ int channelsPerTaskGroup = this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5); int taskNumber = this.configuration.getList( CoreConstant.DATAX_JOB_CONTENT).size(); // this.needChannelNumber参数在split里面计算出来的,和taskNumber任务数 取最小值 this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber); PerfTrace.getInstance().setChannelNumber(needChannelNumber); /** * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务 */ // 公平的分配task到对应的taskGroup中 List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup); LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); //设置 executeMode for (Configuration taskGroupConfig : taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { if (this.jobId <= 0) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 ."); } } LOG.info("Running by {} Mode.", executeMode); this.startTransferTimeStamp = System.currentTimeMillis(); // 这里调用了schedule scheduler.schedule(taskGroupConfigs); this.endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) { LOG.error("运行scheduler 模式[{}]出错.", executeMode); this.endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } /** * 检查任务执行情况 */ this.checkLimit(); }
public final class JobAssignUtil {
private JobAssignUtil() {
}
public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
...
// 计算需要多少个TaskGrouop组的核心逻辑
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
...
// 计算N个Task如何分到这些TaskGroup组
List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
}
}
分组算法
/** * /** * 需要实现的效果通过例子来说是: * <pre> * a 库上有表:0, 1, 2 * b 库上有表:3, 4 * c 库上有表:5, 6, 7 * * 如果有 4个 taskGroup * 则 assign 后的结果为: * taskGroup-0: 0, 4, * taskGroup-1: 3, 6, * taskGroup-2: 5, 2, * taskGroup-3: 1, 7 * * </pre> */ private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) { List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT); Configuration taskGroupTemplate = jobConfiguration.clone(); taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT); List<Configuration> result = new LinkedList<Configuration>(); List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber); for (int i = 0; i < taskGroupNumber; i++) { taskGroupConfigList.add(new LinkedList<Configuration>()); } int mapValueMaxLength = -1; List<String> resourceMarks = new ArrayList<String>(); for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) { resourceMarks.add(entry.getKey()); if (entry.getValue().size() > mapValueMaxLength) { mapValueMaxLength = entry.getValue().size(); } } int taskGroupIndex = 0; for (int i = 0; i < mapValueMaxLength; i++) { for (String resourceMark : resourceMarks) { if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) { int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0); taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId)); taskGroupIndex++; resourceMarkAndTaskIdMap.get(resourceMark).remove(0); } } } Configuration tempTaskGroupConfig; for (int i = 0; i < taskGroupNumber; i++) { tempTaskGroupConfig = taskGroupTemplate.clone(); tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i)); tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i); result.add(tempTaskGroupConfig); } return result; }
调度核心代码实现
public abstract class AbstractScheduler { private static final Logger LOG = LoggerFactory .getLogger(AbstractScheduler.class); private ErrorRecordChecker errorLimit; private AbstractContainerCommunicator containerCommunicator; private Long jobId; public Long getJobId() { return jobId; } public AbstractScheduler(AbstractContainerCommunicator containerCommunicator) { this.containerCommunicator = containerCommunicator; } public void schedule(List<Configuration> configurations) { ... // 核心代码 startAllTaskGroup(configurations); ... } }
public abstract class ProcessInnerScheduler extends AbstractScheduler { private ExecutorService taskGroupContainerExecutorService; public ProcessInnerScheduler(AbstractContainerCommunicator containerCommunicator) { super(containerCommunicator); } @Override public void startAllTaskGroup(List<Configuration> configurations) { // 使用了线程池 this.taskGroupContainerExecutorService = Executors .newFixedThreadPool(configurations.size()); for (Configuration taskGroupConfiguration : configurations) { TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration); this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner); } this.taskGroupContainerExecutorService.shutdown(); } }
public class TaskGroupContainerRunner implements Runnable {
@Override
public void run() {
try {
...
// 启动了这个组容器
this.taskGroupContainer.start();
...
} catch (Throwable e) {
...
}
}
}
public class TaskGroupContainer extends AbstractContainer { @Override public void start() { try { ... while (true) { ... while(iterator.hasNext() && runTasks.size() < channelNumber){ ... TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount); taskStartTimeMap.put(taskId, System.currentTimeMillis()); // 这里是真正的执行逻辑 taskExecutor.doStart(); ... } ... } ... } catch (Throwable e) { ... }finally { ... } } public void doStart() { // 写的线程启动 this.writerThread.start(); // reader没有起来,writer不可能结束 if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable()); } // 读的线程启动 this.readerThread.start(); // 这里reader可能很快结束 if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) { // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常 throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable()); } } }
public class ReaderRunner extends AbstractRunner implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(ReaderRunner.class); private RecordSender recordSender; public void setRecordSender(RecordSender recordSender) { this.recordSender = recordSender; } public ReaderRunner(AbstractTaskPlugin abstractTaskPlugin) { super(abstractTaskPlugin); } @Override public void run() { assert null != this.recordSender; Reader.Task taskReader = (Reader.Task) this.getPlugin(); //统计waitWriterTime,并且在finally才end。 PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME); try { channelWaitWrite.start(); LOG.debug("task reader starts to do init ..."); PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT); initPerfRecord.start(); taskReader.init(); initPerfRecord.end(); LOG.debug("task reader starts to do prepare ..."); PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE); preparePerfRecord.start(); taskReader.prepare(); preparePerfRecord.end(); LOG.debug("task reader starts to read ..."); PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA); // 最核心 dataPerfRecord.start(); taskReader.startRead(recordSender); recordSender.terminate(); dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication())); dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication())); dataPerfRecord.end(); LOG.debug("task reader starts to do post ..."); PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST); postPerfRecord.start(); taskReader.post(); postPerfRecord.end(); // automatic flush // super.markSuccess(); 这里不能标记为成功,成功的标志由 writerRunner 来标志(否则可能导致 reader 先结束,而 writer 还没有结束的严重 bug) } catch (Throwable e) { LOG.error("Reader runner Received Exceptions:", e); super.markFail(e); } finally { LOG.debug("task reader starts to do destroy ..."); PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY); desPerfRecord.start(); super.destroy(); desPerfRecord.end(); channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME)); long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME); if (transformerUsedTime > 0) { PerfRecord transformerRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME); transformerRecord.start(); transformerRecord.end(transformerUsedTime); } } } public void shutdown(){ recordSender.shutdown(); } }
public class MysqlReader extends Reader {
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
// 核心方法
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}
}
public class CommonRdbmsReader { public static class Task { ... public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) { ... try { ... while (rs.next()) { rsNextUsedTime += (System.nanoTime() - lastTime); // 核心逻辑:对单个数据的处理逻辑 this.transportOneRecord(recordSender, rs, metaData, columnNumber, mandatoryEncoding, taskPluginCollector); lastTime = System.nanoTime(); } ... }catch (Exception e) { throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username); } finally { DBUtil.closeDBResources(null, conn); } } } protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, TaskPluginCollector taskPluginCollector) { Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); // 每次处理完成,都会发送到writer recordSender.sendToWriter(record); return record; } }
public class BufferedRecordExchanger implements RecordSender, RecordReceiver { @Override public void sendToWriter(Record record) { .... if (isFull) { // 核心代码 flush(); } ... } @Override public void flush() { if(shutdown){ throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, ""); } // 核心代码,将数据通过channel推送给writer this.channel.pushAll(this.buffer); this.buffer.clear(); this.bufferIndex = 0; this.memoryBytes.set(0); } }
public abstract class Channel { public void pushAll(final Collection<Record> rs) { Validate.notNull(rs); Validate.noNullElements(rs); this.doPushAll(rs); // rs.size():数据条数 // this.getByteSize(rs):数据量 this.statPush(rs.size(), this.getByteSize(rs)); } private void statPush(long recordSize, long byteSize) { ... if (interval - this.flowControlInterval >= 0) { ... // 如果是通过速率数限速的 if (isChannelRecordSpeedLimit) { long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) - CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval; // 当前的速率大于限速的速率,会指定一个睡眠时间 if (currentRecordSpeed > this.recordSpeed) { // 计算根据recordLimit得到的休眠时间 recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed - interval; } } // 休眠时间取较大值 long sleepTime = byteLimitSleepTime < recordLimitSleepTime ? recordLimitSleepTime : byteLimitSleepTime; if (sleepTime > 0) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ... } } }
{ "core": { "transport": { "channel": { "speed": { "byte": 1048576 } } } }, "job": { "setting": { "speed": { "byte": 5242880 } }, ... } }
{ "core": { "transport": { "channel": { "speed": { "record": 100 } } } }, "job": { "setting": { "speed": { "record": 500 } }, ... } }
只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
...
}
}
4G 或者 8G
,这个也可以根据实际情况来调整。python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。