当前位置:   article > 正文

DataX 原理解析和性能优化_datax 优化

datax 优化

datax简介

datax是阿里开源的用于异构数据源之间的同步工具,由于其精巧的设计和抽象,数据同步效率极高,在很多公司数据部门都有广泛的使用。本司基于datax在阿里云普通版的rds服务器上实现了通过公网,从阿里云杭州到美国西部俄勒冈aws emr集群峰值30M以上带宽的传输效率。全量传输上亿条记录、大小30G的数据,最快不到30分钟。要知道如果拉跨洋专线的话,1M带宽每个月至少需要1千大洋呢。走公网照样能达到类似的稳定性,本文通过原理设计来阐述我们是如何基于datax做到的。

datax工作原理

在讲解datax原理之前,需要明确一些概念:

  • Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到hive的一个表的特定分区。

  • Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,若干个任务并发执行。或者将一个大表按照id拆分成1024个分片,若干个分片任务并发执行。

  • TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup。

  • JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。

  • TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元。

job和task是datax两种维度的抽象,后面源码分析中还会涉及到。

datax的处理过程可描述为:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

上述流程可图像化描述为:


其中Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输,一个channel代表一个并发传输通道,通过该通道实现传输速率控制。接下来我们通过源码的角度,在抽取其核心逻辑,以mysql到hdfs的传输为例分析其工作流程。通过分析源码将会有以下几点收获:

  • datax 工作流程

  • datax 插件机制

  • datax 同步实现

datax源码分析

datax 工作流程

  1. public class Engine {
  2. private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
  3. private static String RUNTIME_MODE;
  4. public void start(Configuration allConf) {
  5. boolean isJob = !("taskGroup".equalsIgnoreCase(allConf.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
  6. //JobContainer会在schedule后再行进行设置和调整值
  7. int channelNumber =0;
  8. AbstractContainer container;
  9. long instanceId;
  10. int taskGroupId = -1;
  11. if (isJob) {
  12. allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
  13. container = new JobContainer(allConf);
  14. instanceId = allConf.getLong(
  15. CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
  16. } else {
  17. container = new TaskGroupContainer(allConf);
  18. instanceId = allConf.getLong(
  19. CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
  20. taskGroupId = allConf.getInt(
  21. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
  22. channelNumber = allConf.getInt(
  23. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
  24. }
  25. container.start();
  26. }

job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作

  1. public class JobContainer extends AbstractContainer {
  2. private static final Logger LOG = LoggerFactory
  3. .getLogger(JobContainer.class);
  4. public JobContainer(Configuration configuration) {
  5. super(configuration);
  6. }
  7. /**
  8. * jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler以及destroy和statistics
  9. */
  10. @Override
  11. public void start() {
  12. LOG.info("DataX jobContainer starts job.");
  13. try{
  14. userConf = configuration.clone();
  15. this.init();
  16. this.prepare();
  17. this.totalStage = this.split();
  18. this.schedule();
  19. } catch (Throwable e) {
  20. Communication communication = super.getContainerCommunicator().collect();
  21. // 汇报前的状态,不需要手动进行设置
  22. // communication.setState(State.FAILED);
  23. communication.setThrowable(e);
  24. communication.setTimestamp(this.endTimeStamp);
  25. Communication tempComm = new Communication();
  26. tempComm.setTimestamp(this.startTransferTimeStamp);
  27. Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
  28. super.getContainerCommunicator().report(reportCommunication);
  29. throw DataXException.asDataXException(
  30. FrameworkErrorCode.RUNTIME_ERROR, e);
  31. }
  32. }
  33. /**
  34. * reader和writer的初始化
  35. */
  36. private void init() {
  37. Thread.currentThread().setName("job-" + this.jobId);
  38. JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
  39. this.getContainerCommunicator());
  40. //必须先Reader ,后Writer
  41. this.jobReader = this.initJobReader(jobPluginCollector);
  42. this.jobWriter = this.initJobWriter(jobPluginCollector);
  43. }
  44. /**
  45. *schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
  46. * 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
  47. */
  48. private void schedule() {
  49. /**
  50. * 这里的全局speed和每个channel的速度设置为B/s
  51. */
  52. int channelsPerTaskGroup = this.configuration.getInt(
  53. CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
  54. int taskNumber = this.configuration.getList(
  55. CoreConstant.DATAX_JOB_CONTENT).size();
  56. this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
  57. /**
  58. * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务。
  59. 会考虑 task 中对资源负载作的 load 标识进行更均衡的作业分配操作。
  60. */
  61. List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
  62. this.needChannelNumber, channelsPerTaskGroup);
  63. LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
  64. AbstractScheduler scheduler;
  65. try {
  66. scheduler = initStandaloneScheduler(this.configuration);
  67. this.startTransferTimeStamp = System.currentTimeMillis();
  68. scheduler.schedule(taskGroupConfigs);
  69. this.endTransferTimeStamp = System.currentTimeMillis();
  70. } catch (Exception e) {
  71. LOG.error("运行scheduler出错.");
  72. this.endTransferTimeStamp = System.currentTimeMillis();
  73. throw DataXException.asDataXException(
  74. FrameworkErrorCode.RUNTIME_ERROR, e);
  75. }
  76. }
  77. private AbstractScheduler initStandaloneScheduler(Configuration configuration) {
  78. AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);
  79. super.setContainerCommunicator(containerCommunicator);
  80. return new StandAloneScheduler(containerCommunicator);
  81. }
  82. }
  83. public abstract class AbstractScheduler {
  84. private static final Logger LOG = LoggerFactory
  85. .getLogger(AbstractScheduler.class);
  86. public void schedule(List<Configuration> configurations) {
  87. /**
  88. * 给 taskGroupContainer 的 Communication 注册
  89. */
  90. this.containerCommunicator.registerCommunication(configurations);
  91. int totalTasks = calculateTaskCount(configurations);
  92. startAllTaskGroup(configurations);
  93. try {
  94. while (true) {
  95. Communication nowJobContainerCommunication = this.containerCommunicator.collect();
  96. //汇报周期
  97. long now = System.currentTimeMillis();
  98. if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
  99. Communication reportCommunication = CommunicationTool
  100. .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);
  101. this.containerCommunicator.report(reportCommunication);
  102. if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
  103. LOG.info("Scheduler accomplished all tasks.");
  104. break;
  105. }
  106. if (nowJobContainerCommunication.getState() == State.FAILED) {
  107. dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
  108. }
  109. Thread.sleep(jobSleepIntervalInMillSec);
  110. }
  111. } catch (InterruptedException e) {
  112. // 以 failed 状态退出
  113. LOG.error("捕获到InterruptedException异常!", e);
  114. throw DataXException.asDataXException(
  115. FrameworkErrorCode.RUNTIME_ERROR, e);
  116. }
  117. }
  118. @Override
  119. public void startAllTaskGroup(List<Configuration> configurations) {
  120. this.taskGroupContainerExecutorService = Executors
  121. .newFixedThreadPool(configurations.size());
  122. for (Configuration taskGroupConfiguration : configurations) {
  123. TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
  124. this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
  125. }
  126. this.taskGroupContainerExecutorService.shutdown();
  127. }
  128. @Override
  129. public void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {
  130. this.taskGroupContainerExecutorService.shutdownNow();
  131. }
  132. }
  133. public class TaskGroupContainer extends AbstractContainer {
  134. private static final Logger LOG = LoggerFactory
  135. .getLogger(TaskGroupContainer.class);
  136. @Override
  137. public void start() {
  138. try {
  139. while (true) {
  140. //1.判断task状态
  141. boolean failedOrKilled = false;
  142. Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
  143. for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
  144. Integer taskId = entry.getKey();
  145. Communication taskCommunication = entry.getValue();
  146. if(!taskCommunication.isFinished()){
  147. continue;
  148. }
  149. TaskExecutor taskExecutor = removeTask(runTasks, taskId);
  150. if(taskCommunication.getState() == State.FAILED){
  151. failedOrKilled = true;
  152. break;
  153. }
  154. else if(taskCommunication.getState() == State.SUCCEEDED){
  155. Long taskStartTime = taskStartTimeMap.get(taskId);
  156. if(taskStartTime != null){
  157. Long usedTime = System.currentTimeMillis() - taskStartTime;
  158. LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
  159. this.taskGroupId, taskId, usedTime);
  160. //usedTime*1000*1000
  161. taskStartTimeMap.remove(taskId);
  162. taskConfigMap.remove(taskId);
  163. }
  164. }
  165. }
  166. // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
  167. if (failedOrKilled) {
  168. lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
  169. lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
  170. throw DataXException.asDataXException(
  171. FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
  172. }
  173. //3.有任务未执行,且正在运行的任务数小于最大通道限制
  174. Iterator<Configuration> iterator = taskQueue.iterator();
  175. while(iterator.hasNext() && runTasks.size() < channelNumber){
  176. Configuration taskConfig = iterator.next();
  177. Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
  178. Configuration taskConfigForRun =taskConfig.clone()
  179. TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun);
  180. taskStartTimeMap.put(taskId, System.currentTimeMillis());
  181. taskExecutor.doStart();
  182. terator.remove();
  183. runTasks.add(taskExecutor);
  184. LOG.info("taskGroup[{}] taskId[{}] is started",
  185. this.taskGroupId, taskId);
  186. }
  187. //4.任务列表为空,executor已结束, 搜集状态为success--->成功
  188. if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
  189. // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
  190. lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
  191. lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
  192. LOG.info("taskGroup[{}] completed it's tasks.", this.taskGroupId);
  193. break;
  194. }
  195. } catch (Throwable e) {
  196. Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
  197. if (nowTaskGroupContainerCommunication.getThrowable() == null) {
  198. nowTaskGroupContainerCommunication.setThrowable(e);
  199. }
  200. nowTaskGroupContainerCommunication.setState(State.FAILED);
  201. this.containerCommunicator.report(nowTaskGroupContainerCommunication);
  202. throw DataXException.asDataXException(
  203. FrameworkErrorCode.RUNTIME_ERROR, e);
  204. }
  205. }
  206. }
  207. /**
  208. * TaskExecutor是一个完整task的执行器
  209. * 其中包括1:1的reader和writer
  210. */
  211. class TaskExecutor {
  212. private Thread readerThread;
  213. private Thread writerThread;
  214. private ReaderRunner readerRunner;
  215. private WriterRunner writerRunner;
  216. public TaskExecutor(Configuration taskConf, int attemptCount) {
  217. writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
  218. //生成writerThread
  219. this.writerThread = new Thread(writerRunner,
  220. String.format("%d-%d-%d-writer",
  221. jobId, taskGroupId, this.taskId));
  222. //生成readerThread
  223. readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
  224. this.readerThread = new Thread(readerRunner,
  225. String.format("%d-%d-%d-reader",
  226. jobId, taskGroupId, this.taskId));
  227. }
  228. public void doStart() {
  229. this.writerThread.start();
  230. // reader没有起来,writer不可能结束
  231. if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
  232. throw DataXException.asDataXException(
  233. FrameworkErrorCode.RUNTIME_ERROR,
  234. this.taskCommunication.getThrowable());
  235. }
  236. this.readerThread.start();
  237. // 这里reader可能很快结束
  238. if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
  239. // 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
  240. throw DataXException.asDataXException(
  241. FrameworkErrorCode.RUNTIME_ERROR,
  242. this.taskCommunication.getThrowable());
  243. }
  244. }
  245. }

从上面总体流程中可以看到JobContainer通过线程池调度起所有的TaskGroupContainer,然后轮训TaskGroupContainer的运行状态。每个TaskGroupContainer则是根据分配的chanel并发数量依次执行分配的Task实例。

插件机制

在工作流程中的init步骤,我们看到的jobReader和jobWriter的实现就是通过插件动态生成的。jobReader和jobWriter就对应datax中的Job概念模型。而在TaskExecutor中初始化的readerRunner和writerRunner对应的是Task模型。通过插件datax插件机制支持了数十种不同的数据源之间的读写同步,同时也很方便的支持新的数据源接入。

Job初始化过程

  1. public class JobContainer extends AbstractContainer {
  2. //reader job的初始化,返回Reader.Job
  3. private Reader.Job initJobReader(
  4. JobPluginCollector jobPluginCollector) {
  5. this.readerPluginName = this.configuration.getString(
  6. CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
  7. Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
  8. PluginType.READER, this.readerPluginName);
  9. // 设置reader的jobConfig
  10. jobReader.setPluginJobConf(this.configuration.getConfiguration(
  11. CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
  12. // 设置reader的readerConfig
  13. jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
  14. CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
  15. jobReader.setJobPluginCollector(jobPluginCollector);
  16. jobReader.init();
  17. classLoaderSwapper.restoreCurrentThreadClassLoader();
  18. return jobReader;
  19. }
  20. }

插件加载器,大体上分reader、transformer(还未实现)和writer三中插件类型,
reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同)

  1. public class LoadUtil {
  2. //加载JobPlugin,reader、writer都可能要加载
  3. public static AbstractJobPlugin loadJobPlugin(PluginType pluginType,
  4. String pluginName) {
  5. Class<? extends AbstractPlugin> clazz = LoadUtil.loadPluginClass(
  6. pluginType, pluginName, ContainerType.Job);
  7. try {
  8. AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz
  9. .newInstance();
  10. jobPlugin.setPluginConf(getPluginConf(pluginType, pluginName));
  11. return jobPlugin;
  12. } catch (Exception e) {
  13. throw DataXException.asDataXException(
  14. FrameworkErrorCode.RUNTIME_ERROR,
  15. String.format("DataX找到plugin[%s]的Job配置.",
  16. pluginName), e);
  17. }
  18. }
  19. //反射出具体plugin实例
  20. private static synchronized Class<? extends AbstractPlugin> loadPluginClass(
  21. PluginType pluginType, String pluginName,
  22. ContainerType pluginRunType) {
  23. Configuration pluginConf = getPluginConf(pluginType, pluginName);
  24. JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
  25. try {
  26. return (Class<? extends AbstractPlugin>) jarLoader
  27. .loadClass(pluginConf.getString("class") + "$"
  28. + pluginRunType.value());
  29. } catch (Exception e) {
  30. throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
  31. }
  32. }
  33. public static synchronized JarLoader getJarLoader(PluginType pluginType,
  34. String pluginName) {
  35. Configuration pluginConf = getPluginConf(pluginType, pluginName);
  36. JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType,
  37. pluginName));
  38. if (null == jarLoader) {
  39. String pluginPath = pluginConf.getString("path");
  40. if (StringUtils.isBlank(pluginPath)) {
  41. throw DataXException.asDataXException(
  42. FrameworkErrorCode.RUNTIME_ERROR,
  43. String.format(
  44. "%s插件[%s]路径非法!",
  45. pluginType, pluginName));
  46. }
  47. jarLoader = new JarLoader(new String[]{pluginPath});
  48. jarLoaderCenter.put(generatePluginKey(pluginType, pluginName),
  49. jarLoader);
  50. }
  51. return jarLoader;
  52. }
  53. }
  54. //提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。
  55. public class JarLoader extends URLClassLoader {
  56. public JarLoader(String[] paths) {
  57. this(paths, JarLoader.class.getClassLoader());
  58. }
  59. public JarLoader(String[] paths, ClassLoader parent) {
  60. super(getURLs(paths), parent);
  61. }
  62. private static URL[] getURLs(String[] paths) {
  63. Validate.isTrue(null != paths && 0 != paths.length,
  64. "jar包路径不能为空.");
  65. List<String> dirs = new ArrayList<String>();
  66. for (String path : paths) {
  67. dirs.add(path);
  68. JarLoader.collectDirs(path, dirs);
  69. }
  70. List<URL> urls = new ArrayList<URL>();
  71. for (String path : dirs) {
  72. urls.addAll(doGetURLs(path));
  73. }
  74. return urls.toArray(new URL[0]);
  75. }
  76. private static void collectDirs(String path, List<String> collector) {
  77. if (null == path || StringUtils.isBlank(path)) {
  78. return;
  79. }
  80. File current = new File(path);
  81. if (!current.exists() || !current.isDirectory()) {
  82. return;
  83. }
  84. for (File child : current.listFiles()) {
  85. if (!child.isDirectory()) {
  86. continue;
  87. }
  88. collector.add(child.getAbsolutePath());
  89. collectDirs(child.getAbsolutePath(), collector);
  90. }
  91. }
  92. }

Task 初始化过程

  1. class TaskExecutor {
  2. private AbstractRunner generateRunner(PluginType pluginType) {
  3. return generateRunner(pluginType, null);
  4. }
  5. private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> transformerInfoExecs) {
  6. AbstractRunner newRunner = null;
  7. TaskPluginCollector pluginCollector;
  8. switch (pluginType) {
  9. case READER:
  10. newRunner = LoadUtil.loadPluginRunner(pluginType,
  11. this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
  12. newRunner.setJobConf(this.taskConfig.getConfiguration(
  13. CoreConstant.JOB_READER_PARAMETER));
  14. pluginCollector = ClassUtil.instantiate(
  15. taskCollectorClass, AbstractTaskPluginCollector.class,
  16. configuration, this.taskCommunication,
  17. PluginType.READER);
  18. RecordSender recordSender;
  19. if (transformerInfoExecs != null && transformerInfoExecs.size() > 0) {
  20. recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
  21. } else {
  22. recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
  23. }
  24. ((ReaderRunner) newRunner).setRecordSender(recordSender);
  25. /**
  26. * 设置taskPlugin的collector,用来处理脏数据和job/task通信
  27. */
  28. newRunner.setTaskPluginCollector(pluginCollector);
  29. break;
  30. case WRITER:
  31. newRunner = LoadUtil.loadPluginRunner(pluginType,
  32. this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
  33. newRunner.setJobConf(this.taskConfig
  34. .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
  35. pluginCollector = ClassUtil.instantiate(
  36. taskCollectorClass, AbstractTaskPluginCollector.class,
  37. configuration, this.taskCommunication,
  38. PluginType.WRITER);
  39. ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
  40. this.channel, pluginCollector));
  41. /**
  42. * 设置taskPlugin的collector,用来处理脏数据和job/task通信
  43. */
  44. newRunner.setTaskPluginCollector(pluginCollector);
  45. break;
  46. default:
  47. throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
  48. }
  49. newRunner.setTaskGroupId(taskGroupId);
  50. newRunner.setTaskId(this.taskId);
  51. newRunner.setRunnerCommunication(this.taskCommunication);
  52. return newRunner;
  53. }
  54. }
  55. public class LoadUtil {
  56. /**
  57. * 根据插件类型、名字和执行时taskGroupId加载对应运行器
  58. *
  59. * @param pluginType
  60. * @param pluginName
  61. * @return
  62. */
  63. public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) {
  64. AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType,
  65. pluginName);
  66. switch (pluginType) {
  67. case READER:
  68. return new ReaderRunner(taskPlugin);
  69. case WRITER:
  70. return new WriterRunner(taskPlugin);
  71. default:
  72. throw DataXException.asDataXException(
  73. FrameworkErrorCode.RUNTIME_ERROR,
  74. String.format("插件[%s]的类型必须是[reader]或[writer]!",
  75. pluginName));
  76. }
  77. }
  78. }

同步实现

这部分就是经过split后的具体的Task的执行逻辑。Task的划分逻辑如下:

  1. public class JobContainer extends AbstractContainer {
  2. private static final Logger LOG = LoggerFactory
  3. .getLogger(JobContainer.class);
  4. /**
  5. * 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
  6. * 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
  7. * 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
  8. */
  9. private int split() {
  10. this.adjustChannelNumber();
  11. if (this.needChannelNumber <= 0) {
  12. this.needChannelNumber = 1;
  13. }
  14. List<Configuration> readerTaskConfigs = this
  15. .doReaderSplit(this.needChannelNumber);
  16. int taskNumber = readerTaskConfigs.size();
  17. List<Configuration> writerTaskConfigs = this
  18. .doWriterSplit(taskNumber);
  19. List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
  20. LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
  21. /**
  22. * 输入是reader和writer的parameter list,输出是content下面元素的list
  23. */
  24. List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
  25. readerTaskConfigs, writerTaskConfigs, transformerList);
  26. LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
  27. this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
  28. return contentConfig.size();
  29. }
  30. }

 每个Task都执行相同的逻辑和流程,下面以读mysql和写hdfs为例,展示其读写过程。

  1. //单个slice的reader执行调用
  2. public class ReaderRunner extends AbstractRunner implements Runnable {
  3. @Override
  4. public void run() {
  5. Reader.Task taskReader = (Reader.Task) this.getPlugin();
  6. taskReader.init();
  7. taskReader.prepare();
  8. taskReader.startRead(recordSender);
  9. recordSender.terminate();
  10. }
  11. }
  12. public class MysqlReader extends Reader {
  13. @Override
  14. public void startRead(RecordSender recordSender) {
  15. int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
  16. this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
  17. super.getTaskPluginCollector(), fetchSize);
  18. }
  19. }
  20. public class CommonRdbmsReader {
  21. public static class Task {
  22. private static final Logger LOG = LoggerFactory
  23. .getLogger(Task.class);
  24. public void startRead(Configuration readerSliceConfig,
  25. RecordSender recordSender,
  26. TaskPluginCollector taskPluginCollector, int fetchSize) {
  27. String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
  28. String table = readerSliceConfig.getString(Key.TABLE);
  29. PerfTrace.getInstance().addTaskDetails(taskId, table + "," + basicMsg);
  30. LOG.info("Begin to read record by Sql: [{}\n] {}.",
  31. querySql, basicMsg);
  32. Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl,
  33. username, password);
  34. int columnNumber = 0;
  35. ResultSet rs = null;
  36. try {
  37. rs = DBUtil.query(conn, querySql, fetchSize);
  38. while (rs.next()) {
  39. //将数据记录放入channel通道,writer从中获取写数据
  40. this.transportOneRecord(recordSender, rs,
  41. metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
  42. }
  43. }catch (Exception e) {
  44. throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
  45. } finally {
  46. DBUtil.closeDBResources(null, conn);
  47. }
  48. }
  49. }
  50. }

 //单个slice的writer执行调用

  1. public class WriterRunner extends AbstractRunner implements Runnable {
  2. @Override
  3. public void run() {
  4. Writer.Task taskWriter = (Writer.Task) this.getPlugin();
  5. taskWriter.init();
  6. taskWriter.prepare();
  7. taskWriter.startWrite(recordReceiver);
  8. }
  9. }
  10. public class HdfsWriter extends Writer {
  11. public static class Task extends Writer.Task {
  12. private static final Logger LOG = LoggerFactory.getLogger(Task.class);
  13. @Override
  14. public void startWrite(RecordReceiver lineReceiver) {
  15. LOG.info("begin do write...");
  16. LOG.info(String.format("write to file : [%s]", this.fileName));
  17. if(fileType.equalsIgnoreCase("TEXT")){
  18. //写TEXT FILE
  19. hdfsHelper.textFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
  20. this.getTaskPluginCollector());
  21. }else if(fileType.equalsIgnoreCase("ORC")){
  22. //写ORC FILE
  23. hdfsHelper.orcFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
  24. this.getTaskPluginCollector());
  25. }
  26. LOG.info("end do write");
  27. }
  28. }
  29. }
  30. public class HdfsHelper {
  31. public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,TaskPluginCollector taskPluginCollector){
  32. try {
  33. RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
  34. Record record = null;
  35. while ((record = lineReceiver.getFromReader()) != null) {
  36. MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);
  37. if (!transportResult.getRight()) {
  38. writer.write(NullWritable.get(),transportResult.getLeft());
  39. }
  40. }
  41. writer.close(Reporter.NULL);
  42. } catch (Exception e) {
  43. String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
  44. LOG.error(message);
  45. Path path = new Path(fileName);
  46. deleteDir(path.getParent());
  47. throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
  48. }
  49. }
  50. }

reader和writer通过BufferedRecordExchanger建立联系,在其内部实现了基于ArrayBlockingQueue的MemoryChannel。

  1. public class BufferedRecordExchanger implements RecordSender, RecordReceiver {
  2. @Override
  3. public void sendToWriter(Record record) {
  4. if(shutdown){
  5. throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
  6. }
  7. Validate.notNull(record, "record不能为空.");
  8. if (record.getMemorySize() > this.byteCapacity) {
  9. this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
  10. return;
  11. }
  12. boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
  13. if (isFull) {
  14. flush();
  15. }
  16. this.buffer.add(record);
  17. this.bufferIndex++;
  18. memoryBytes.addAndGet(record.getMemorySize());
  19. }
  20. @Override
  21. public void flush() {
  22. if(shutdown){
  23. throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
  24. }
  25. this.channel.pushAll(this.buffer);
  26. this.buffer.clear();
  27. this.bufferIndex = 0;
  28. this.memoryBytes.set(0);
  29. }
  30. @Override
  31. public Record getFromReader() {
  32. if(shutdown){
  33. throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
  34. }
  35. boolean isEmpty = (this.bufferIndex >= this.buffer.size());
  36. if (isEmpty) {
  37. receive();
  38. }
  39. Record record = this.buffer.get(this.bufferIndex++);
  40. if (record instanceof TerminateRecord) {
  41. record = null;
  42. }
  43. return record;
  44. }

datax性能优化

通过datax原理和实现的理解,自然可以知道如何提升datax的同步效率。以mysql同步hdfs为例,自然最直接的方式就是提高mysql和hdfs的硬件性能如cpu、内存、IOPS、网络带宽等。当硬件资源受限的情况下,可以有如下几种办法:

  1. 将不同的集群划分到同一个网络或者区域内,减少跨网络的不稳定性,如将阿里云集群迁移到amazon集群,或者同一个amazon集群中不同区域划分到同一个子网络内。

  2. 对数据库按照主键划分。datax对单个表默认一个通道,如果指定拆分主键,将会大大提升同步并发数和吞吐量。

  3. 在cpu、内存以及mysql负载满足的情况下,提升通道并发数。通道并发数意味着更多的内存开销,jvm调优是重中之重。

  4. 当无法提升通道数量时,而且每个拆分依然很大的时候,可以考虑对每个拆分再次拆分。

  5. 设定合适的参数,如mysql超时等。

总结

本文通过原理介绍和源码分析,逐步理清datax的工作流程和实现原理,并结合实际经验给出几点优化建议。然而在实际中涉及到更多的分库分表和特大量级的表,数据库的承载压力也是一大考虑因素,否则遭到dba的吊打肯定会在所难免。尤其是我们涉及到跨大洋数据同步,网络的稳定性也是一大挑战,此时基于增量同步方案或许是更好的选择。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/124554?site
推荐阅读
相关标签
  

闽ICP备14008679号