赞
踩
目录
官方参考文档:https://github.com/alibaba/DataX/blob/master/userGuid.md
Linux 操作系统
JDK(1.8 及其以上都可以,推荐 1.8):Linux 下安装 JDK 和 Maven 环境_linux安装jdk和maven-CSDN博客
Python(2 或者 3 都可以):Spark-3.2.4 高可用集群安装部署详细图文教程_spark高可用-CSDN博客
Apache Maven 3.x(只有源码编译安装需要):Linux 下安装 JDK 和 Maven 环境_linux安装jdk和maven-CSDN博客
1、下载安装 DataX 工具包,下载地址:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
2、将下载好的包上传到 Linux 中
3、解压安装即可
(base) [root@hadoop03 ~]# tar -zxvf datax.tar.gz -C /usr/local/
- # python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
-
- # 例如:
- python /usr/local/datax/bin/datax.py /usr/local/datax/job/job.json
如果执行自检程序出现如下错误:
- [main] WARN ConfigParser - 插件[streamreader,streamwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/usr/local/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
- [main] ERROR Engine -
-
- 经DataX智能分析,该任务最可能的错误原因是:
- com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/usr/local/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件.
- at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
- at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
- at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
- at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
- at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
- at com.alibaba.datax.core.Engine.entry(Engine.java:137)
- at com.alibaba.datax.core.Engine.main(Engine.java:204)
解决方案:将 plugin 目录下的所有的以 _ 开头的文件都删除即可
- cd /usr/local/datax/plugin
- find ./* -type f -name ".*er" | xargs rm -rf
DataX 这个项目本身是用 Python2 进行开发的,因此需要使用 Python2 的版本进行执行。但是我们安装的 Python 版本是 3,而且 3 和 2 的语法差异还是比较大的。因此直接使用 python3
去执行的话,会出现问题。
如果需要使用 python3
去执行数据同步的计划,需要修改 bin
目录下的三个 py
文件,将这三个文件中的如下部分修改即可:
print xxx 替换为 print(xxx)
Exception, e 替换为 Exception as e
- # 以 datax.py 为例进行修改
- (base) [root@hadoop03 ~]# cd /usr/local/datax/bin/
- (base) [root@hadoop03 /usr/local/datax/bin]# ls
- datax.py dxprof.py perftrace.py
- (base) [root@hadoop03 /usr/local/datax/bin]# vim datax.py
- print(readerRef)
- print(writerRef)
- jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
- print(jobGuid)
使用 python3 命令执行自检脚本:
(base) [root@hadoop03 /usr/local/datax/bin]# python3 /usr/local/datax/bin/datax.py /usr/local/datax/job/job.json
DataX 的数据同步工作,需要使用 json 文件来保存配置信息,配置 writer、reader 等信息。我们可以使用如下的命令来生成一个配置的 json 模板,在这个模板上进行修改,生成最终的 json文件。
python3 /usr/local/datax/bin/datax.py -r {reader} -w {writer}
将其中的 {reader} 替换成自己想要的 reader 组件名字,将其中的 {writer} 替换成自己想要的 writer 组件名字。
所有的 reader 都存储于 DataX 安装目录下的 plugin/reader
目录下,可以在这个目录下查看:
- (base) [root@hadoop03 /usr/local/datax]# ls
- bin conf job lib log log_perf plugin script tmp
- (base) [root@hadoop03 /usr/local/datax]# ls plugin/reader/
- cassandrareader ftpreader hbase11xsqlreader loghubreader odpsreader otsreader sqlserverreader tsdbreader
- clickhousereader gdbreader hbase20xsqlreader mongodbreader opentsdbreader otsstreamreader starrocksreader txtfilereader
- datahubreader hbase094xreader hdfsreader mysqlreader oraclereader postgresqlreader streamreader
- drdsreader hbase11xreader kingbaseesreader oceanbasev10reader ossreader rdbmsreader tdenginereader
所有的 writer 都存储于 DataX 安装目录下的 plugin/writer
目录下,可以在这个目录下查看:
- (base) [root@hadoop03 /usr/local/datax]# ls plugin/writer/
- adbpgwriter datahubwriter gdbwriter hdfswriter mongodbwriter odpswriter postgresqlwriter streamwriter
- adswriter doriswriter hbase094xwriter hologresjdbcwriter mysqlwriter oraclewriter rdbmswriter tdenginewriter
- cassandrawriter drdswriter hbase11xsqlwriter kingbaseeswriter neo4jwriter oscarwriter selectdbwriter tsdbwriter
- clickhousewriter elasticsearchwriter hbase11xwriter kuduwriter oceanbasev10writer osswriter sqlserverwriter txtfilewriter
- databendwriter ftpwriter hbase20xsqlwriter loghubwriter ocswriter otswriter starrockswriter
例如需要查看 streamreader
和 streamwriter
的配置,可以使用如下操作:
python3 /usr/local/datax/bin/datax.py -r streamreader -w streamwriter
这个命令可以将 json 模板直接打印在控制台上,如果想要以文件的形式保存下来,重定向输出到指定文件:
python3 /usr/local/datax/bin/datax.py -r streamreader -w streamwriter > ~/stream2stream.json
创建 stream2stream.json 文件:
- (base) [root@hadoop03 ~]# mkdir jobs
- (base) [root@hadoop03 ~]# cd jobs/
- (base) [root@hadoop03 ~/jobs]# vim stream2stream.json
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "streamreader",
- "parameter": {
- "sliceRecordCount": 10,
- "column": [
- {
- "type": "long",
- "value": "10"
- },
- {
- "type": "string",
- "value": "hello,你好,世界-DataX"
- }
- ]
- }
- },
- "writer": {
- "name": "streamwriter",
- "parameter": {
- "encoding": "UTF-8",
- "print": true
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 5
- }
- }
- }
- }
(base) [root@hadoop03 ~/jobs]# python3 /usr/local/datax/bin/datax.py stream2stream.json
DataX 同步数据的时候需要使用到自己设置的配置文件,其中可以定义同步的方案,通常为 json 的格式。在执行同步方案的时候,有些场景下需要有一些动态的数据。例如:
将 MySQL 的数据同步到 HDFS,多次同步的时候只是表的名字和字段不同。
将 MySQL 的数据增量的同步到 HDFS 或者 Hive 中的时候,需要指定每一次同步的时间。
...
这些时候,如果我们每一次都去写一个新的 json 文件将会非常麻烦,此时我们就可以使用 动态传参
所谓的动态传参,就是在 json 的同步方案中,使用类似变量的方式来定义一些可以改变的参数。在执行同步方案的时候,可以指定这些参数具体的值。
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "streamreader",
- "parameter": {
- "sliceRecordCount": $TIMES,
- "column": [
- {
- "type": "long",
- "value": "10"
- },
- {
- "type": "string",
- "value": "hello,你好,世界-DataX"
- }
- ]
- }
- },
- "writer": {
- "name": "streamwriter",
- "parameter": {
- "encoding": "UTF-8",
- "print": true
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
在使用到同步方案的时候,可以使用 -D 来指定具体的参数的值。例如在上述的 json 中,我们设置了一个参数 TIMES,在使用的时候,可以指定 TIMES 的值,来动态的设置 sliceRecordCount
的值。
python3 /usr/local/datax/bin/datax.py -p "-DTIMES=3" stream2stream.json
在 DataX 的处理流程中,Job 会被划分成为若干个 Task 并发执行,被不同的 TaskGroup 管理。在每一个 Task 的内部,都由 reader -> channel -> writer
的结构组成,其中 channel 的数量决定了并发度。那么 channel 的数量是怎么指定的?
直接指定 channel 数量
通过 Bps
计算 channel 数量
通过 tps
计算 channel 数量
在同步方案的 json 文件中,我们可以设置 job.setting.speed.channel
来设置 channel 的数量。这是最直接的方式。在这种配置下,channel 的 Bps
为默认的 1MBps,即每秒传输 1MB 的数据。
Bps(Byte per second)是一种非常常见的数据传输速率的表示,在 DataX 中,可以通过参数设置来限制总 Job 的 Bps 以及单个 channel 的Bps,来达到限速和 channel 数量计算的效果。
Job Bps:对一个 Job 进行整体的限速,可以通过 job.setting.speed.byte
进行设置。
channel Bps:对单个 channel 的限速,可以通过 core.transport.channel.speed.byte
进行设置。
tps(transcation per second)是一种很常见的数据传输速率的表示,在 DataX 中,可以通过参数设置来限制总 Job 的 tps 以及单个 channel 的 tps,来达到限速和 channel 数量计算的效果。
Job tps:对一个 Job 进行整体的限速,可以通过 job.setting.speed.record
进行设置。
channel tps:对单个 channel 的限速,可以通过 core.transport.channel.speed.record
进行设置。
如果同时配置了 Bps 和 tps 限制,以小的为准。
只有在 Bps 和 tps 都没有配置的时候,才会以 channel 数量配置为准。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。