赞
踩
github:https://github.com/alibaba/DataX
目录
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
当前使用现状
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
核心模块介绍
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
目前支持的数据源
可靠的数据质量监控
完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
提供作业全链路的流量、数据量运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
丰富的数据转换功能
DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。
精准的速度控制
还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度"speed": { "channel": 5, "byte": 1048576, "record": 10000 }
- "speed": {
- "channel": 5,
- "byte": 1048576,
- "record": 10000
- }
强劲的同步性能
DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。性能测试相关详情可以参照每单个数据源的详细介绍:DataX数据源指南
健壮的容错机制
DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
线程内部重试
DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。
线程级别重试
目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。
极简的使用体验
易用
下载即可用,支持linux和windows,只需要短短几步骤就可以完成数据的传输。请点击:Quick Start
详细
DataX在运行日志中打印了大量信息,其中包括传输速度,Reader、Writer性能,进程CPU,JVM和GC情况等等。
传输过程中打印传输速度、进度等
传输过程中会打印进程相关的CPU、JVM等
在任务结束之后,打印总体运行情况
datax适合在异构数据库/文件系统之间高速交换数据。 主要解决批量同步问题,无法满足多数增量同步和实时同步的需求(可以实现微批次的伪实时),且不适合超大数据量的全量同步。
python方式调用例子 : python D:\JAVA\datax\bin\datax.py -j"-Xms1024m -Xmx1024m" D:\JAVA\datax\bin\json123.conf
java方式调用:java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\JAVA\datax/log -Xms1024m -Xmx1024m -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\JAVA\datax -Dlogback.configurationFile=D:\JAVA\datax/conf/logback.xml -classpath D:\JAVA\datax/lib/* -Dlog.file.name=5ffabc799aec48f_conf com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\JAVA\datax\bin\json123.conf
json案例(mysql2mysql):
- {
- "job": {
- "setting": {
- "speed": {
- "channel": 10 },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "splitPk": "id",
- "connection": [{
- "table": ["task"],
- "jdbcUrl": ["jdbc:mysql://192.168.5.191:3306/data_operation"]
- }]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "preSql": [""],
- "connection": [{
- "table": ["task"],
- "jdbcUrl": "jdbc:mysql://192.168.11.12:3306/test"
- }]
- }
- }
- }]
- }
- }
具体的规则请参考github
- {
- "job": {
- "setting": {
- "speed": {
- "channel": 10 },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "splitPk": "id",
- "connection": [{
- "table": ["task"],
- "jdbcUrl": ["jdbc:mysql://192.168.5.191:3306/data_operation"]
- }]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "preSql": [""],
- "connection": [{
- "table": ["task"],
- "jdbcUrl": "jdbc:mysql://192.168.11.12:3306/test"
- }]
- }
- },
- "transformer": [
- {
- "name": "dx_substr",
- "parameter":
- {
- "columnIndex":1,
- "paras":["1","3"]
- }
- }
- ]
- }]
- }
- }
- 1、dx_substr
- 参数:3个
- 第一个参数:字段编号,对应record中第几个字段。
- 第二个参数:字段值的开始位置。
- 第三个参数:目标字段长度。
- 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
- 举例:
- dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe"
- dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test"
-
-
- 2、dx_pad
- 参数:4个
- 第一个参数:字段编号,对应record中第几个字段。
- 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
- 第三个参数:目标字段长度。
- 第四个参数:需要pad的字符。
- 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
- 举例:
- dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
- dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
-
-
- 3、dx_replace
- 参数:4个
- 第一个参数:字段编号,对应record中第几个字段。
- 第二个参数:字段值的开始位置。
- 第三个参数:需要替换的字段长度。
- 第四个参数:需要替换的字符串。
- 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
- 举例:
- dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est"
- dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****"
-
-
- 4、dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)
- 参数:
- 第一个参数:字段编号,对应record中第几个字段。
- 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
- 第三个参数:正则表达式(java正则表达式)、值。
- 返回:
- 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
- like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
- , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
-
- 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。
- 举例:
- dx_filter(1,"like","dataTest")
- dx_filter(1,">=","10")
-
-
- 5、dx_groovy
- 参数。
- 第一个参数: groovy code
- 第二个参数(列表或者为空):extraPackage
- 备注:
- dx_groovy只能调用一次。不能多次调用。
- groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
- groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
- 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
- 举例:
-
-
- groovy 实现的subStr:
- String code = "Column column = record.getColumn(1);\n" +
- " String oriValue = column.asString();\n" +
- " String newValue = oriValue.substring(0, 3);\n" +
- " record.setColumn(1, new StringColumn(newValue));\n" +
- " return record;";
- dx_groovy(record);
-
- groovy 实现的Replace
- String code2 = "Column column = record.getColumn(1);\n" +
- " String oriValue = column.asString();\n" +
- " String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
- " record.setColumn(1, new StringColumn(newValue));\n" +
- " return record;";
-
- groovy 实现的Pad
- String code3 = "Column column = record.getColumn(1);\n" +
- " String oriValue = column.asString();\n" +
- " String padString = \"12345\";\n" +
- " String finalPad = \"\";\n" +
- " int NeedLength = 8 - oriValue.length();\n" +
- " while (NeedLength > 0) {\n" +
- "\n" +
- " if (NeedLength >= padString.length()) {\n" +
- " finalPad += padString;\n" +
- " NeedLength -= padString.length();\n" +
- " } else {\n" +
- " finalPad += padString.substring(0, NeedLength);\n" +
- " NeedLength = 0;\n" +
- " }\n" +
- " }\n" +
- " String newValue= finalPad + oriValue;\n" +
- " record.setColumn(1, new StringColumn(newValue));\n" +
- " return record;";
- 1、在com.alibaba.datax.core.transport.transformer下新增Transformer类继承Transformer抽象类并实现evaluate方法以及构造方法确认transformerName;
- 2、在com.alibaba.datax.core.transport.transformer下的TransformerRegistry类的static方法中将刚刚新增的transformer类的对象注册到registedTransformer中;
-
-
- 例子如下:
- 1、新增一个JqkTransformer类,将特定下标列的字段改写成jqk这个字符串
-
- public class JqkTransformer extends Transformer {
-
- public JqkTransformer() {
- setTransformerName("dx_jqk");
- }
- @Override
- public Record evaluate(Record record, Object... paras) {
-
- int columnIndex;
- try {
- if (paras.length != 1) {
- throw new RuntimeException("dx_jqk paras must be 4");
- }
-
- columnIndex = (Integer) paras[0];
- } catch (Exception e) {
- throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
- }
-
-
- record.setColumn(columnIndex, new StringColumn("jqk"));
- return record;
- }
- }
-
-
- 2、将JqkTransformer类的对象注册到registedTransformer中
-
- static {
- /**
- * add native transformer
- * local storage and from server will be delay load.
- */
-
- // JqkTransformer注册进去
- registTransformer(new JqkTransformer());
- registTransformer(new SubstrTransformer());
- registTransformer(new PadTransformer());
- registTransformer(new ReplaceTransformer());
- registTransformer(new FilterTransformer());
- registTransformer(new GroovyTransformer());
-
- }
-
- 3、transformer使用
- {
- "job": {
- "setting": {
- "speed": {
- "channel": 10
- },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "splitPk": "id",
- "connection": [{
- "table": ["task"],
- "jdbcUrl": ["jdbc:mysql://192.168.5.191:3306/data_operation"]
- }]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "preSql": [""],
- "connection": [{
- "table": ["task"],
- "jdbcUrl": "jdbc:mysql://192.168.11.12:3306/test"
- }]
- }
- },
- "transformer": [
- {
- "name": "dx_substr",
- "parameter":
- {
- "columnIndex":1,
- "paras":["1","3"]
- }
- },{
- "name": "dx_jqk",
- "parameter":
- {
- "columnIndex":3
- }
- }
- ]
- }]
- }
- }
- job.setting.speed.channel : channel并发数
-
- job.setting.speed.record : 全局配置channel的record限速(默认值为100000)
-
- job.setting.speed.byte:全局配置channel的byte限速 (默认值为10M)
-
- core.transport.channel.speed.record:单channel的record限速
-
- core.transport.channel.speed.byte:单channel的byte限速
-
- ChannelNum =${job.setting.speed.byte} / ${core.transport.channel.speed.byte} = ${job.setting.speed.record} / ${core.transport.channel.speed.record} = ${job.setting.speed.channel}
这里涉及到2个文件,一个是../datax/conf/core.json,这个文件种可以配置core.transport.channel.speed.record和core.transport.channel.speed.byte这两个参数;另一个是用户自己提交的datax执行的json文件,里面可以配置job.setting.speed.channel、job.setting.speed.record、job.setting.speed.byte三个参数。
channelNum 优先根据byte和record的配置决定,其次由channel配置决定,逻辑伪码如下
- int needChannelNumberByByte = Integer.MAX_VALUE;
- int needChannelNumberByRecord = Integer.MAX_VALUE;
-
- needChannelNumberByByte =
- (int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
-
- needChannelNumberByRecord =
- (int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
-
- // 取较小值
- this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
- needChannelNumberByByte : needChannelNumberByRecord;
-
- if (this.needChannelNumber < Integer.MAX_VALUE) {
- return;
- }
-
- this.needChannelNumber = this.configuration.getInt(
- CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
举个例子(这里配置了全局byte限速是200K,单个channel限速在core.json里配了是100K)
- {
- "job": {
- "setting": {
- "speed": {
- "byte": 204800
- },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "splitPk": "id",
- "connection": [{
- "table": ["task"],
- "jdbcUrl": ["jdbc:mysql://192.168.5.191:3306/data_operation"]
- }]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "username": "remote",
- "password": "hangzhoumeiri",
- "column": ["`id`", "`taskName`", "`templateId`", "`paramJson`", "`userId`", "`status`", "`jobs`", "`stage`", "`progress`", "`dataFilePath`", "`is_deleted`", "`create_time`", "`last_update_time`", "`result`", "`job_ids`", "`syn`", "`end_run_time`", "`start_run_time`", "`expect_execute_time`", "`task_info`", "`scheduler_task_id`"],
- "preSql": [""],
- "connection": [{
- "table": ["task"],
- "jdbcUrl": "jdbc:mysql://192.168.11.12:3306/test"
- }]
- }
- }
- }]
- },
- "core": {
- "dataXServer": {
- "address": "http://localhost:7001/api",
- "timeout": 10000,
- "reportDataxLog": false,
- "reportPerfLog": false
- },
- "transport": {
- "channel": {
- "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
- "speed": {
- "byte": 102400
- },
- "flowControlInterval": 20,
- "capacity": 512,
- "byteCapacity": 67108864
- },
- "exchanger": {
- "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
- "bufferSize": 32
- }
- },
- "container": {
- "job": {
- "reportInterval": 10000
- },
- "taskGroup": {
- "channel": 5
- },
- "trace": {
- "enable": "true"
- }
-
- },
- "statistics": {
- "collector": {
- "plugin": {
- "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
- "maxDirtyNumber": 10
- }
- }
- }
- }
- }
1、使用的python版本目前必须为2.x,最好是2.6.x;当python版本为3.x时,会报错(主要是print语法)
1、目前开源版本只支持单机模式,需要依赖调度系统(后面会有文章介绍依赖azkaban调度的方式,在每个节点上部署客户端)
2、写入hdfs时,写入的hdfs路径必须存在(可以后期修改源代码,或者使用脚本生成)
3、同一个配置文件中不支持同时写入不同的数据源
datax默认支持的方式就是全量同步
datax目前仅对部分数据源支持了增量同步,以mysql为例,需要表存在create_time字段,例子如下
- {
- "job": {
- "setting": {
- "speed": {
- "channel": 16
- }
- },
- "content": [
- {
- "reader": {
- "name": "oraclereader",
- "parameter": {
- "splitPk": "id",
- "username": "yibo",
- "password": "yibo123",
- "column": [
- "*"
-
- ],
- "connection": [
- {
-
- "jdbcUrl": [
- "jdbc:oracle:thin:@10.91.1.5:1521:gzfy"
- ],
- "querySql": [
- "select * from tb_lis_indicators where trunc(create_date) > to_date(${createdate}, 'yyyy/mm/dd') "
- ]
- }
- ]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
-
- "username": "root",
- "password": "123456",
- "column": [
- "*"
- ],
- "batchSize": "4096",
- "connection": [
- {
- "jdbcUrl": "jdbc:mysql://10.91.1.4:3306/yibo",
- "table": [
- "TB_LIS_INDICATORS"
- ]
- }
- ]
- }
- }
- }
- ]
- }
- }
调用:python datax.py -p "-Dcreatedate='2019/03/1'" datax.json
第一种方案 : Mysql binlog -> Canal -> Kafka → datax KafkaReader -> datax Writer -> other source
缺点比较明显,只支持mysql到其他数据源的同步
KafkaReader 参考:https://github.com/crabo/kafka-sqlreader-datax
第二种方案:借助FlinkX(https://github.com/DTStack/flinkx)
优点:
缺点
参考:
https://blog.csdn.net/lw277232240/article/details/90903251
https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
总结:
1、github下载datax源码。
2、本地解压导入IDEA。
3、按照他原有的module创建需要的module,比如redisHashreader。
4、将任意原有模块的plugin.json和plugin_job_template.json拷贝到redisHashreader的resource目录下,并进行修改。
5、修改pom.xml,新增要实现的数据库或文件系统需要的依赖。
6、修改redisHashreader中的package.xml,将其中的reader改成redisHashreader。
7、在最外层的package.xml加上redisHashreader相应的fileSet元素。
8、定义好自己的reader需要用户传哪些参数进行处理,并实现redisHashreader类继承Reader抽象类,重写init()和split()方法,新增静态内部Task类继承Reader.Task抽象类,重写init()和startRead()方法
9、maven命令打包项目模块
10、打包后将相应的包上传到集群的datax对应的plugin下的相应目录
11、写好配置json并通过python命令或者java命令运行
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。