一、引言
有的项目可能存在一些需求,项目需要使用自己的定时任务调度工具(如xxl-job等)来调度datax任务脚本,这个时候就需要在SpringBoot工程中集成Datax来使用。
二、集成方案
一般有两个比较简单的集成方案:
(1) 执行command命令方式
(2) 调用datax任务执行器方式
三、集成实战
1、执行command命令方式
此方案只需要编写一个工具类即可,但是应用运行环境需要支持python。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
/**
* 命令执行工具类
*/
@Component
public class ExecCommandUtil {
private static Logger log = LoggerFactory.getLogger(ExecCommandUtil.class);
private static String CHARSET;
@Value("${spring.datax.command.charset:GBK}")
public void setCharset(String charset) {
this.CHARSET = charset;
}
public static void execCommand(String param) throws Exception {
int exitValue = -1;
String[] command = param.split(" ");
log.info(Arrays.toString(command));
BufferedReader bufferedReader = null;
try {
long startTime = System.currentTimeMillis();
// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(command);
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
// 指定读取流编码
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, CHARSET));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
log.info(line);
}
// command exit
process.waitFor();
long endTime = System.currentTimeMillis();
log.debug("command execute spend time: {} ms", endTime - startTime);
exitValue = process.exitValue();
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
// 命令退出值exitValue不等于0且不等于3,代表命令未成功执行
if (exitValue != 0 && exitValue != 3) {
throw new Exception(String.format("command is failed, exit value=%s.", exitValue));
}
}
}
2、调用datax任务执行器方式
(1) 添加依赖
注意:添加依赖前,需要将如下的这些包上传到私有仓库。
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
com.alibaba.datax
datax-common
0.0.1-SNAPSHOT
com.alibaba.datax
datax-core
0.0.1-SNAPSHOT
junit
junit
test
(2) 添加配置
安装路径就是上篇文章讲的datax安装目录
## DataX插件安装路径设置
spring.datax.homepath=/data/datax/datax
(3) 编码
- datax工作目录系统变量设置工具类DataxHomePathUtil
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* datax工作目录工具类
*/
@Component
public class DataxHomePathUtil {
private static Logger logger = LoggerFactory.getLogger(DataxHomePathUtil.class);
/**
* datax工作目录
* 存放插件与job定义文件
*/
private static String DATAX_PLUGIN_PATH;
@Value("${spring.datax.homepath:}")
public void setDataxPluginPath(String dataxPluginPath)
{
this.DATAX_PLUGIN_PATH = dataxPluginPath;
}
public static void setDataxHomePath() {
logger.debug("---datax插件安装目录:{}", DATAX_PLUGIN_PATH);
System.setProperty("datax.home", DATAX_PLUGIN_PATH);
}
}
- DataX任务引擎调用工具类EngineHelper
import com.alibaba.datax.core.Engine;
import org.springframework.stereotype.Component;
/**
* job引擎执行工具类
*/
@Component
public class EngineHelper {
/**
* datax任务引擎
* @param jobJson json配置文件路径
* @throws Throwable
*/
public static void entry(String jobJson) throws Throwable {
DataxHomePathUtil.setDataxHomePath();
String[] datxArgs2 = {"-job", jobJson, "-mode", "standalone", "-jobid", "-1"};
Engine.entry(datxArgs2);
}
}
3、测试
(1) 添加配置
添加配置前,请准备好数据同步任务脚本,并上传至对应路径。
## datax数据同步任务脚本
spring.datax.job.balfund=/data/datax/datax/job/balfund-1.json
## datax数据同步命令
spring.datax.command.py-balfund=python /data/datax/datax/bin/datax.py -p"-Dversion='8'" /data/datax/datax/job/balfund-clickhouse2.json
(2)编写测试类
import com.***.datax.util.EngineHelper;
import com.***.datax.util.ExecCommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequestMapping("/datax")
public class DataxController {
Logger log = LoggerFactory.getLogger(DataxController.class);
@Value("${spring.datax.job.balfund}")
private String jobJsonBalfund;
@Value("${spring.datax.command.py-balfund}")
private String pyJobBalfund;
@GetMapping("/test-1")
public String test1() {
log.info("------------{}", jobJsonBalfund);
try {
EngineHelper.entry(jobJsonBalfund);
} catch (Throwable e) {
throw new RuntimeException(e);
}
return "执行完成";
}
@GetMapping("/test-2")
public String test2() {
log.info("------------{}", jobJsonBalfund);
try {
ExecCommandUtil.execCommand(pyJobBalfund);
} catch (Exception e) {
throw new RuntimeException(e);
}
return "执行完成";
}
}