当前位置:   article > 正文

DataX同步离线数据基础使用教程_datax菜鸟教程

datax菜鸟教程

​ DataX 是阿里云 DataWorks数据集成的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

1.场景:

这里演示使用DataX实现从mysql抽取数据到stream中。

2.编译Datax:

​ DataX本身没有编译好的releases,所以在使用DataX时,需要自己编译。可以根据业务场景和减少编译时间,将DataX中其他不需要的模块注释掉,本文在这里演示时只需要mysqlreader和streamreader,故而将其他模块注释掉。

2.1 git clone
git clone https://github.com/alibaba/DataX.git
  • 1
cd {DataX_source_code_home} #你clone下来datax的源码目录
  • 1
vim pom.xml
  • 1

在这里插入图片描述

​ 以上我只保留了mysqlreader,streamreader和hdfsreader。

2.2编译:
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
  • 1

在这里插入图片描述

3.简单使用:

编译成功的datax在源码目录下的/target/datax/datax/中。

cd {DataX_source_code_home}/target/datax/datax/ 
  • 1

通过以下命令可以查看配置信息:

 python datax.py -r streamreader -w streamwriter
  • 1

创建第一个stream2stream.json文件 :

vim  stream2stream.json
  • 1
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "sliceRecordCount": 10,
                        "column": [
                            {
                                "type": "long",
                                "value": "10"
                            },
                            {
                                "type": "string",
                                "value": "DataX第一个测试文件"
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "UTF-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

启动DataX:

cd {DataX_source_code_home}/target/datax/datax/bin/
  • 1
python datax.py ./stream2stream.json
  • 1

执行结果如下:

2021-12-17 14:52:21.929 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-12-17 14:52:11
任务结束时刻                    : 2021-12-17 14:52:21
任务总计耗时                    :                 10s
任务平均流量                    :               95B/s
记录写入速度                    :              5rec/s
读出记录总数                    :                  50
读写失败总数                    :                   0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

创建第二个mysql2stream.json文件:

首先在mysql中创建数据库表和样例数据:

CREATE TABLE `order_analysis` (
  `date` varchar(19) DEFAULT NULL,
  `user_src` varchar(9) DEFAULT NULL,
  `order_src` varchar(11) DEFAULT NULL,
  `order_location` varchar(2) DEFAULT NULL,
  `new_order` int DEFAULT NULL,
  `payed_order` int DEFAULT NULL,
  `pending_order` int DEFAULT NULL,
  `cancel_order` int DEFAULT NULL,
  `reject_order` int DEFAULT NULL,
  `good_order` int DEFAULT NULL,
  `report_order` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '微信朋友圈H5页面', '微信公众号', '武汉', 12749, 11127, 1773, 6, 5, 9874, 678);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '地推二维码扫描', 'iOS APP', '上海', 13086, 15882, 1727, 1764, 1429, 12501, 625);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '微信朋友圈H5页面', 'iOS APP', '武汉', 15129, 15598, 1204, 1295, 1831, 11500, 320);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '地推二维码扫描', 'Android APP', '杭州', 20687, 18526, 1398, 550, 213, 12911, 185);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '应用商店', '微信公众号', '武汉', 12388, 11422, 702, 106, 158, 5820, 474);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', '微信朋友圈H5页面', '微信公众号', '上海', 14298, 11682, 1880, 582, 154, 7348, 354);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', '地推二维码扫描', 'Android APP', '深圳', 22079, 14333, 5565, 1742, 439, 8246, 211);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC浏览器引流', 'iOS APP', '上海', 28968, 18151, 7212, 2373, 1232, 10739, 578);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

创建json文件:

python datax.py -r mysqlreader -w streamwriter
  • 1
vim mysql2stream.json
  • 1
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://localhost:3306/test_db"],
                                "table": ["order_analysis"]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "encoding": "utf-8",
                        "print": true
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

启动datax:

python datax.py ./mysql2stream.json
  • 1

在这里插入图片描述

以上就是完成DataX使用案例,更多详细信息见:https://github.com/alibaba/DataX/blob/master/userGuid.md

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号