当前位置:   article > 正文

ETL系列:二、SpringBoot集成DataX

springboot集成datax

一、引言

有的项目可能存在一些需求,项目需要使用自己的定时任务调度工具(如xxl-job等)来调度datax任务脚本,这个时候就需要在SpringBoot工程中集成Datax来使用。

二、集成方案

一般有两个比较简单的集成方案:

(1) 执行command命令方式

(2) 调用datax任务执行器方式

三、集成实战

1、执行command命令方式

此方案只需要编写一个工具类即可,但是应用运行环境需要支持python。

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.stereotype.Component;
  5. import java.io.BufferedInputStream;
  6. import java.io.BufferedReader;
  7. import java.io.InputStreamReader;
  8. import java.util.Arrays;
  9. /**
  10. * 命令执行工具类
  11. */
  12. @Component
  13. public class ExecCommandUtil {
  14. private static Logger log = LoggerFactory.getLogger(ExecCommandUtil.class);
  15. private static String CHARSET;
  16. @Value("${spring.datax.command.charset:GBK}")
  17. public void setCharset(String charset) {
  18. this.CHARSET = charset;
  19. }
  20. public static void execCommand(String param) throws Exception {
  21. int exitValue = -1;
  22. String[] command = param.split(" ");
  23. log.info(Arrays.toString(command));
  24. BufferedReader bufferedReader = null;
  25. try {
  26. long startTime = System.currentTimeMillis();
  27. // command process
  28. ProcessBuilder processBuilder = new ProcessBuilder();
  29. processBuilder.command(command);
  30. processBuilder.redirectErrorStream(true);
  31. Process process = processBuilder.start();
  32. BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
  33. // 指定读取流编码
  34. bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, CHARSET));
  35. // command log
  36. String line;
  37. while ((line = bufferedReader.readLine()) != null) {
  38. log.info(line);
  39. }
  40. // command exit
  41. process.waitFor();
  42. long endTime = System.currentTimeMillis();
  43. log.debug("command execute spend time: {} ms", endTime - startTime);
  44. exitValue = process.exitValue();
  45. } finally {
  46. if (bufferedReader != null) {
  47. bufferedReader.close();
  48. }
  49. }
  50. // 命令退出值exitValue不等于0且不等于3,代表命令未成功执行
  51. if (exitValue != 0 && exitValue != 3) {
  52. throw new Exception(String.format("command is failed, exit value=%s.", exitValue));
  53. }
  54. }
  55. }

2、调用datax任务执行器方式

(1) 添加依赖

注意:添加依赖前,需要将如下的这些包上传到私有仓库。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. <scope>test</scope>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.alibaba.datax</groupId>
  12. <artifactId>datax-common</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>com.alibaba.datax</groupId>
  17. <artifactId>datax-core</artifactId>
  18. <version>0.0.1-SNAPSHOT</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>junit</groupId>
  22. <artifactId>junit</artifactId>
  23. <scope>test</scope>
  24. </dependency>

(2) datax依赖包构建

  • 下载源码

git clone git@github.com:alibaba/DataX.git

  • 打包上传私服,core与common包在maven下进行安装到本地

mvn install:install-file -DgroupId=com.datax -DartifactId=datax-core -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-core-0.0.1-SNAPSHOT.jar

mvn install:install-file -DgroupId=com.datax -DartifactId=datax-common -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-common-0.0.1-SNAPSHOT.jar

(3) 添加配置

安装路径就是上篇文章讲的datax安装目录

  1. ## DataX插件安装路径设置
  2. spring.datax.homepath=/data/datax/datax

(4) 编码

  • datax工作目录系统变量设置工具类DataxHomePathUtil
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * datax工作目录工具类
  7. */
  8. @Component
  9. public class DataxHomePathUtil {
  10. private static Logger logger = LoggerFactory.getLogger(DataxHomePathUtil.class);
  11. /**
  12. * datax工作目录
  13. * 存放插件与job定义文件
  14. */
  15. private static String DATAX_PLUGIN_PATH;
  16. @Value("${spring.datax.homepath:}")
  17. public void setDataxPluginPath(String dataxPluginPath)
  18. {
  19. this.DATAX_PLUGIN_PATH = dataxPluginPath;
  20. }
  21. public static void setDataxHomePath() {
  22. logger.debug("---datax插件安装目录:{}", DATAX_PLUGIN_PATH);
  23. System.setProperty("datax.home", DATAX_PLUGIN_PATH);
  24. }
  25. }
  • DataX任务引擎调用工具类EngineHelper
  1. import com.alibaba.datax.core.Engine;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * job引擎执行工具类
  5. */
  6. @Component
  7. public class EngineHelper {
  8. /**
  9. * datax任务引擎
  10. * @param jobJson json配置文件路径
  11. * @throws Throwable
  12. */
  13. public static void entry(String jobJson) throws Throwable {
  14. DataxHomePathUtil.setDataxHomePath();
  15. String[] datxArgs2 = {"-job", jobJson, "-mode", "standalone", "-jobid", "-1"};
  16. Engine.entry(datxArgs2);
  17. }
  18. }

3、测试

(1) 添加配置

添加配置前,请准备好数据同步任务脚本,并上传至对应路径。

  1. ## datax数据同步任务脚本
  2. spring.datax.job.balfund=/data/datax/datax/job/balfund-1.json
  3. ## datax数据同步命令
  4. spring.datax.command.py-balfund=python /data/datax/datax/bin/datax.py -p"-Dversion='8'" /data/datax/datax/job/balfund-clickhouse2.json

(2)编写测试类

  1. import com.***.datax.util.EngineHelper;
  2. import com.***.datax.util.ExecCommandUtil;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Controller;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. @Controller
  10. @RequestMapping("/datax")
  11. public class DataxController {
  12. Logger log = LoggerFactory.getLogger(DataxController.class);
  13. @Value("${spring.datax.job.balfund}")
  14. private String jobJsonBalfund;
  15. @Value("${spring.datax.command.py-balfund}")
  16. private String pyJobBalfund;
  17. @GetMapping("/test-1")
  18. public String test1() {
  19. log.info("------------{}", jobJsonBalfund);
  20. try {
  21. EngineHelper.entry(jobJsonBalfund);
  22. } catch (Throwable e) {
  23. throw new RuntimeException(e);
  24. }
  25. return "执行完成";
  26. }
  27. @GetMapping("/test-2")
  28. public String test2() {
  29. log.info("------------{}", jobJsonBalfund);
  30. try {
  31. ExecCommandUtil.execCommand(pyJobBalfund);
  32. } catch (Exception e) {
  33. throw new RuntimeException(e);
  34. }
  35. return "执行完成";
  36. }
  37. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/402154
推荐阅读
相关标签
  

闽ICP备14008679号