赞
踩
有的项目可能存在一些需求,项目需要使用自己的定时任务调度工具(如xxl-job等)来调度datax任务脚本,这个时候就需要在SpringBoot工程中集成Datax来使用。
一般有两个比较简单的集成方案:
(1) 执行command命令方式
(2) 调用datax任务执行器方式
此方案只需要编写一个工具类即可,但是应用运行环境需要支持python。
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- import java.io.BufferedInputStream;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- import java.util.Arrays;
-
- /**
- * 命令执行工具类
- */
- @Component
- public class ExecCommandUtil {
- private static Logger log = LoggerFactory.getLogger(ExecCommandUtil.class);
- private static String CHARSET;
- @Value("${spring.datax.command.charset:GBK}")
- public void setCharset(String charset) {
- this.CHARSET = charset;
- }
-
- public static void execCommand(String param) throws Exception {
- int exitValue = -1;
- String[] command = param.split(" ");
- log.info(Arrays.toString(command));
- BufferedReader bufferedReader = null;
- try {
- long startTime = System.currentTimeMillis();
- // command process
- ProcessBuilder processBuilder = new ProcessBuilder();
- processBuilder.command(command);
- processBuilder.redirectErrorStream(true);
-
- Process process = processBuilder.start();
-
- BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
-
- // 指定读取流编码
- bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, CHARSET));
-
- // command log
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- log.info(line);
- }
-
- // command exit
- process.waitFor();
- long endTime = System.currentTimeMillis();
- log.debug("command execute spend time: {} ms", endTime - startTime);
- exitValue = process.exitValue();
- } finally {
- if (bufferedReader != null) {
- bufferedReader.close();
- }
- }
-
- // 命令退出值exitValue不等于0且不等于3,代表命令未成功执行
- if (exitValue != 0 && exitValue != 3) {
- throw new Exception(String.format("command is failed, exit value=%s.", exitValue));
- }
- }
- }
注意:添加依赖前,需要将如下的这些包上传到私有仓库。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.datax</groupId>
- <artifactId>datax-common</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.datax</groupId>
- <artifactId>datax-core</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
git clone git@github.com:alibaba/DataX.git
mvn install:install-file -DgroupId=com.datax -DartifactId=datax-core -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-core-0.0.1-SNAPSHOT.jar
mvn install:install-file -DgroupId=com.datax -DartifactId=datax-common -Dversion=1.0.0 -Dpackaging=jar -Dfile=datax-common-0.0.1-SNAPSHOT.jar
安装路径就是上篇文章讲的datax安装目录
- ## DataX插件安装路径设置
- spring.datax.homepath=/data/datax/datax
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
-
- /**
- * datax工作目录工具类
- */
- @Component
- public class DataxHomePathUtil {
- private static Logger logger = LoggerFactory.getLogger(DataxHomePathUtil.class);
- /**
- * datax工作目录
- * 存放插件与job定义文件
- */
- private static String DATAX_PLUGIN_PATH;
-
- @Value("${spring.datax.homepath:}")
- public void setDataxPluginPath(String dataxPluginPath)
- {
- this.DATAX_PLUGIN_PATH = dataxPluginPath;
- }
-
- public static void setDataxHomePath() {
- logger.debug("---datax插件安装目录:{}", DATAX_PLUGIN_PATH);
- System.setProperty("datax.home", DATAX_PLUGIN_PATH);
- }
-
- }
- import com.alibaba.datax.core.Engine;
- import org.springframework.stereotype.Component;
-
- /**
- * job引擎执行工具类
- */
- @Component
- public class EngineHelper {
- /**
- * datax任务引擎
- * @param jobJson json配置文件路径
- * @throws Throwable
- */
- public static void entry(String jobJson) throws Throwable {
- DataxHomePathUtil.setDataxHomePath();
- String[] datxArgs2 = {"-job", jobJson, "-mode", "standalone", "-jobid", "-1"};
- Engine.entry(datxArgs2);
- }
- }
添加配置前,请准备好数据同步任务脚本,并上传至对应路径。
- ## datax数据同步任务脚本
- spring.datax.job.balfund=/data/datax/datax/job/balfund-1.json
- ## datax数据同步命令
- spring.datax.command.py-balfund=python /data/datax/datax/bin/datax.py -p"-Dversion='8'" /data/datax/datax/job/balfund-clickhouse2.json
- import com.***.datax.util.EngineHelper;
- import com.***.datax.util.ExecCommandUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
-
-
- @Controller
- @RequestMapping("/datax")
- public class DataxController {
- Logger log = LoggerFactory.getLogger(DataxController.class);
- @Value("${spring.datax.job.balfund}")
- private String jobJsonBalfund;
-
- @Value("${spring.datax.command.py-balfund}")
- private String pyJobBalfund;
-
- @GetMapping("/test-1")
- public String test1() {
- log.info("------------{}", jobJsonBalfund);
- try {
- EngineHelper.entry(jobJsonBalfund);
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- return "执行完成";
- }
-
- @GetMapping("/test-2")
- public String test2() {
- log.info("------------{}", jobJsonBalfund);
- try {
- ExecCommandUtil.execCommand(pyJobBalfund);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return "执行完成";
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。