当前位置:   article > 正文

大数据热度面试题:Datax限速与调优_datax同步数据到本地比较慢

datax同步数据到本地比较慢

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

 

当我们需要传输数据的时候,数据传导组件肯定会想尽办法占用掉设备的资源,但是,随着对DataX深入使用可以发现,DataX并不会全力吃掉资源,所以究竟DataX是如何做到限速的?传输缓慢到底是限速原因还是其他原因?以及该如何优化?本文来一起探讨下。

一、限速

通过查询官网资料,可以得知Datax是通过conf/core.json文件里面的speed方法里面限速的,可以通过调整record记录数和byte字节数来限速。

这个配置可以在Datax源码中的CoreConstant类中找到:

public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE = "core.transport.channel.speed.byte";

public static final String DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD = "core.transport.channel.speed.record";

接下来,看看这两个配置到底是如何实现限速的。

1. 使用位置

点击两个常量值,可以看到有两个地方使用了这个值:

初始化Channel

public Channel(final Configuration configuration) {

//channel的queue里默认record为1万条。原来为512条

int capacity = configuration.getInt(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);

long byteSpeed = configuration.getLong(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);

long recordSpeed = configuration.getLong(

CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);

if (capacity <= 0) {

throw new IllegalArgumentException(String.format(

"通道容量[%d]必须大于0.", capacity));

}

求最大通道数

// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!

Long channelLimitedByteSpeed = this.configuration

.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);

if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {

throw DataXException.asDataXException(

FrameworkErrorCode.CONFIG_ERROR,

"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");

}

2. 具体限速流程

从上述代码可以看出,在Channel初始化时,顺带初始化了限速的记录数(recordSpeed)以及字节数(byteSpeed) ,点击两个对应的变量,可以看到在statPush方法中使用到:

boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);

boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);

if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {

return;

}

具体流程:

判断byteSpeed(bps)和recordSpeed(tps)是否都大于0?如果不是,则退出。

根据当前的byteSpeed和设定的byteSpeed对比,求出睡眠时间(公式:currentByteSpeed * interval this.byteSpeed- interval;)

根据当前的recordSpeed和设定的recordSpeed对比,求出睡眠时间(公式:currentRecordSpeed * interval / this.recordSpeed - interval;)

取休眠时间最大值。

Thread.sleep(sleepTime)来休眠。

实现限速。

3. statPush完整代码

private void statPush(long recordSize, long byteSize) {

currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,

recordSize);

currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,

byteSize);

//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数

currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);

currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);

boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);

boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);

if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {

return;

}

long lastTimestamp = lastCommunication.getTimestamp();

long nowTimestamp = System.currentTimeMillis();

long interval = nowTimestamp - lastTimestamp;

if (interval - this.flowControlInterval >= 0) {

long byteLimitSleepTime = 0;

long recordLimitSleepTime = 0;

if (isChannelByteSpeedLimit) {

long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -

CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;

if (currentByteSpeed > this.byteSpeed) {

// 计算根据byteLimit得到的休眠时间

byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed

- interval;

}

}

if (isChannelRecordSpeedLimit) {

long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -

CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;

if (currentRecordSpeed > this.recordSpeed) {

// 计算根据recordLimit得到的休眠时间

recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed

- interval;

}

}

// 休眠时间取较大值

long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?

recordLimitSleepTime : byteLimitSleepTime;

if (sleepTime > 0) {

try {

Thread.sleep(sleepTime);

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

}

}

lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,

currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));

lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,

currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));

lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,

currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));

lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,

currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));

lastCommunication.setTimestamp(nowTimestamp);

}

}

二、调优

我们知道,数据传输受两个因素影响,一是网络速度等硬件因素,二是Datax本身的参数,故而当我们觉得Datax传输速度慢时,需要从这两个方面排查。硬件问题有专业人士负责,我们接下来介绍一下Datax本身参数的调优。

1. 全局(提升每个Channel速度)

在DataX内部对每个Channel会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度,比如:我们可以把单个Channel的速度上限配置为5MB/s,举例:

{

"core":{

"transport":{

"channel":{

"speed":{

"channel": 2, 此处为数据导入的并发度,建议根据服务器硬件进行调优

"record":-1,此处解除对读取行数的限制

"byte":5242880,此处解除对字节的限制

"batchSize":204每次读取batch的大小

}

}

}

},

"job":{

...

}

}

2. 局部(提升Job内Channel并发数)

并发数=taskGroup的数量每一个TaskGroup并发执行的Task数 (默认单个任务组的并发数量为5)。

提升job内Channel并发有三种配置方式:

配置全局Byte限速以及单Channel Byte限速,Channel个数 = 全局Byte限速 / 单Channel Byte限速。

配置全局Record限速以及单Channel Record限速,Channel个数 = 全局Record限速 / 单Channel Record限速。

直接配置Channel个数。

{

"core": {

"transport": {

"channel": {

"speed": {

"byte": 1048576 //单个channel byte限速1M/s

}

}

}

},

"job": {

"setting": {

"speed": {

"byte" : 5242880 //总byte限速5M/s

}

},

...

}

}

3. 内存调整

当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:

python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

三、总结

本文介绍了Datax的限速流程,以及如何进行优化,针对Channel进行调整时有注意事项:

若配置了总record限速,则必须配置单个channel的record限速。

若配置了总byte限速,则必须配置单个channel的byte限速。

若配置了总record限速和总byte限速,channel并发数参数就会失效。

在了解基本配置之后,运用于实际操作的时候,要根据情况做出调整,做出符合业务的配置。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/124574
推荐阅读
相关标签
  

闽ICP备14008679号