赞
踩
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
设计理念
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
当前使用现状
DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。
Github主页地址:https://github.com/alibaba/DataX
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
准备1台虚拟机
本例系统版本 CentOS-7.8,已安装jdk1.8
关闭防火墙
systemctl stop firewalld
# 下载解压
wget --no-check-certificate https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
tar -xzvf datax.tar.gz
mv datax /data/datax/
# 进入安装目录
cd /data/datax
# 执行验证脚本
python bin/datax.py job/job.json
执行完成后,大致输出如下结果:
任务启动时刻 : 2024-03-10 00:46:57
任务结束时刻 : 2024-03-10 00:47:07
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
安装很简单,至此安装成功!
输入以下命令,会生成模板文件,然后再修改
python bin/datax.py -r mysqlreader -w hdfswriter
创建文件 job/mysql2hdfs02.json
,详细内容如下:
该配置文件定义了从一个 MySQL 数据库读取数据,并将这些数据写入到 HDFS 的过程。
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["id","name","msg","create_time","status","last_login_time"], "connection": [ { "jdbcUrl": ["jdbc:mysql://192.168.56.1:3306/user?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai"], "table": ["t_user"] } ], "password": "password", "username": "test", "where": "id>3" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ {"name":"id","type":"bigint"}, {"name":"name","type":"string"}, {"name":"msg","type":"string"}, {"name":"create_time","type":"date"}, {"name":"status","type":"string"}, {"name":"last_login_time","type":"date"} ], "compress": "gzip", "defaultFS": "hdfs://hadoop131:9000", "fieldDelimiter": "\t", "fileName": "mysql2hdfs01", "fileType": "text", "path": "/mysql2hdfs", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
python bin/datax.py job/mysql2hdfs02.json
执行结果如下:
任务启动时刻 : 2024-03-10 02:30:46
任务结束时刻 : 2024-03-10 02:30:57
任务总计耗时 : 11s
任务平均流量 : 12B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
1)在页面中查看,可以点击下载,然后使用文本编辑器查看
2)直接使用Hadoop命令查看,可以看见表中的数据已经同步过去了
hadoop fs -cat /mysql2hdfs/* |zcat
输入以下命令,会生成模板文件,然后再修改
python bin/datax.py -r mysqlreader -w hdfswriter
创建文件 job/hdfs2mysql01.json
,详细内容如下:
{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "column": [ "*" ], "compress": "gzip", "encoding": "UTF-8", "defaultFS": "hdfs://hadoop131:9000", "fieldDelimiter": "\t", "fileName": "mysql2hdfs01", "fileType": "text", "path": "/mysql2hdfs", "nullFormat": "\\N" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["id","name","msg","create_time","status","last_login_time"], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.56.1:3306/user?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai", "table": ["t_user_test"] } ], "password": "password", "username": "test", "writeMode": "replace" } } } ], "setting": { "speed": { "channel": "1" } } } }
注意,需要预先创建好表
t_user_test
python bin/datax.py job/hdfs2mysql01.json
执行结果如下:
任务启动时刻 : 2024-03-29 17:27:45
任务结束时刻 : 2024-03-29 17:27:56
任务总计耗时 : 10s
任务平均流量 : 13B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
DataX的job配置中的reader
、writer
和setting
是构成数据同步任务的关键组件。
reader
是数据同步任务中的数据源读取配置部分,用于指定从哪个数据源读取数据以及如何读取数据。它通常包含以下关键信息:
name
: 读取插件的名称,如mysqlreader
、hdfsreader
等,用于指定从哪种类型的数据源读取数据。parameter
: 具体的读取参数配置,包括数据源连接信息、读取的表或文件路径、字段信息等。示例:
假设要从MySQL数据库读取数据,reader
的配置可能如下:
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "password",
"column": ["id", "name", "age"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/test_db",
"table": ["test_table"]
}
]
}
}
writer
是数据同步任务中的目标数据源写入配置部分,用于指定将数据写入哪个目标数据源以及如何写入数据。它通常包含以下关键信息:
name
: 写入插件的名称,如mysqlwriter
、hdfswriter
等,用于指定将数据写入哪种类型的数据源。parameter
: 具体的写入参数配置,包括目标数据源连接信息、写入的表或文件路径、字段映射等。示例:
假设要将数据写入HDFS,writer
的配置可能如下:
"writer": {
"name": "hdfswriter",
"parameter": {
"writeMode": "append",
"fieldDelimiter": ",",
"compress": "gzip",
"column": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": "int"}],
"connection": [
{
"hdfsUrl": "hdfs://localhost:9000",
"file": ["/user/hive/warehouse/test_table"]
}
]
}
}
setting
是数据同步任务的全局设置部分,用于配置影响整个任务行为的参数。它通常包含以下关键信息:
speed
: 控制数据同步的速度和并发度,包括通道数(channel
)和每个通道的数据传输速度(如byte
)。errorLimit
: 设置数据同步过程中的错误容忍度,包括允许出错的记录数(record
)和错误率(percentage
)。示例:
一个典型的setting
配置可能如下:
"setting": {
"speed": {
"channel": 3, // 并发通道数
"byte": 1048576 // 每个通道的数据传输速度,单位是字节(1MB)
},
"errorLimit": {
"record": 0, // 允许出错的记录数
"percentage": 0.02 // 允许出错的记录数占总记录数的百分比
}
}
综上所述,reader
、writer
和setting
三个部分共同构成了DataX数据同步任务的配置文件。通过合理配置这些部分,用户可以灵活地定义数据源、目标数据源以及数据同步的行为和性能。在实际应用中,用户应根据具体的数据源类型、目标数据源类型和数据同步需求来填写和调整这些配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。