赞
踩
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
当我们需要传输数据的时候,数据传导组件肯定会想尽办法占用掉设备的资源,但是,随着对DataX深入使用可以发现,DataX并不会全力吃掉资源,所以究竟DataX是如何做到限速的?传输缓慢到底是限速原因还是其他原因?以及该如何优化?本文来一起探讨下。
一、限速
通过查询官网资料,可以得知Datax是通过conf/core.json文件里面的speed方法里面限速的,可以通过调整record记录数和byte字节数来限速。
这个配置可以在Datax源码中的CoreConstant类中找到:
public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE = "core.transport.channel.speed.byte";
public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD = "core.transport.channel.speed.record";
接下来,看看这两个配置到底是如何实现限速的。
1. 使用位置
点击两个常量值,可以看到有两个地方使用了这个值:
初始化Channel
public Channel(final Configuration configuration) {
//channel的queue里默认record为1万条。原来为512条
int capacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
long byteSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
long recordSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
if (capacity <= 0) {
throw new IllegalArgumentException(String.format(
"通道容量[%d]必须大于0.", capacity));
}
求最大通道数
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
2. 具体限速流程
从上述代码可以看出,在Channel初始化时,顺带初始化了限速的记录数(recordSpeed)以及字节数(byteSpeed) ,点击两个对应的变量,可以看到在statPush方法中使用到:
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
具体流程:
判断byteSpeed(bps)和recordSpeed(tps)是否都大于0?如果不是,则退出。
根据当前的byteSpeed和设定的byteSpeed对比,求出睡眠时间(公式:currentByteSpeed * interval this.byteSpeed- interval;)
根据当前的recordSpeed和设定的recordSpeed对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;)
取休眠时间最大值。
Thread.sleep(sleepTime)来休眠。
实现限速。
3. statPush完整代码
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}
二、调优
我们知道,数据传输受两个因素影响,一是网络速度等硬件因素,二是Datax本身的参数,故而当我们觉得Datax传输速度慢时,需要从这两个方面排查。硬件问题有专业人士负责,我们接下来介绍一下Datax本身参数的调优。
1. 全局(提升每个Channel速度)
在DataX内部对每个Channel会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度,比如:我们可以把单个Channel的速度上限配置为5MB/s,举例:
{
"core":{
"transport":{
"channel":{
"speed":{
"channel": 2, 此处为数据导入的并发度,建议根据服务器硬件进行调优
"record":-1,此处解除对读取行数的限制
"byte":5242880,此处解除对字节的限制
"batchSize":204每次读取batch的大小
}
}
}
},
"job":{
...
}
}
2. 局部(提升Job内Channel并发数)
并发数=taskGroup的数量每一个TaskGroup并发执行的Task数 (默认单个任务组的并发数量为5)。
提升job内Channel并发有三种配置方式:
配置全局Byte限速以及单Channel Byte限速,Channel个数 = 全局Byte限速 / 单Channel Byte限速。
配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速。
直接配置Channel个数。
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576 //单个channel byte限速1M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880 //总byte限速5M/s
}
},
...
}
}
3. 内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
三、总结
本文介绍了Datax的限速流程,以及如何进行优化,针对Channel进行调整时有注意事项:
若配置了总record限速,则必须配置单个channel的record限速。
若配置了总byte限速,则必须配置单个channel的byte限速。
若配置了总record限速和总byte限速,channel并发数参数就会失效。
在了解基本配置之后,运用于实际操作的时候,要根据情况做出调整,做出符合业务的配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。