赞
踩
目录
2.3.1 主键自增进行增量同步(MySQL->MySQL)
2.3.2 时间自增进行增量同步(MySQL->MySQL)
1、mysql <-> mysql <-> hive <-> hive
从github(https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz)
下载安装包或者下载源码打包(在这里使用下载安装包的方式),上传到服务器运行tar –zxvf安装
datax/job文件夹下自带一个简单job任务,可以以此测试是否安装成功,
python /usr/datax/bin/datax.py ../job/job.json。具体使用在后文中介绍。
(1)Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决.].- 获取作业配置信息失败:/usr/datax/job/job,json - java.io.FileNotFoundException: File '/usr/datax/job/job,json' does not exist
rm -rf /usr/datax/plugin/*/._* 删除插件文件夹中的隐藏文件
(2)Description:[DataX引擎配置错误,该问题通常是由于DataX安装错误引起,请联系您的运维解决.]. - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数
修改datax/conf/core.json, core -> transport -> channel -> speed -> "byte": 2000000,将单个channel的大小改为2MB即可。
(1)MySQL (5.5+) 必选,对应客户端可以选装, Linux服务上若安装mysql的客户端可以通过部署脚本快速初始化数据库
(2)JDK (1.8.0_xxx) 必选
(3)Maven (3.6.1+) 必选
(4)DataX 必选
(5)Python (2.x) (支持Python3需要修改替换datax/bin下面的三个python文件,替换文件在doc/datax-web/datax-python3下) 必选,主要用于调度执行底层DataX的启动脚本,默认的方式是以Java子进程方式执行DataX,用户可以选择以Python方式来做自定义的改造
打包 mvn clean install
将jar包上传到服务器并tar –zxvf安装,
进入datax-web-2.1.2/bin ,bash运行install.sh进行交互式安装,交互式安装会依次解压各模块的package以及调用configure配置脚本,可以逐步判断安装包是否有误。
如果服务上有MySQL,则执行安装脚本过程中会出现:
Scan out mysql command, so begin to initalize the database
Do you want to initalize database with sql: [{INSTALL_PATH}/bin/db/datax-web.sql]? (Y/N)y
Please input the db host(default: 127.0.0.1):
Please input the db port(default: 3306):
Please input the db username(default: root):
Please input the db password(default: ):
Please input the db name(default: exchangis)
可以输入数据库地址,端口号,用户名,密码以及数据库名称,如果没有安装MySQL,则可以取用目录下/bin/db/datax-web.sql脚本手动执行,完成后修改相关配置文件
vi ./modules/datax-admin/conf/bootstrap.properties
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
注意MySQL-version务必高于5.5,实际测试中版本过低会导致配置表创建出错
在项目目录: /modules/datax-admin/bin/env.properties 配置邮件服务(可跳过)
MAIL_USERNAME=""
MAIL_PASSWORD=""
此文件中包括一些默认配置参数,例如:server.port,具体请查看文件。
在项目目录下/modules/datax-execute/bin/env.properties 指定PYTHON_PATH的路径
vi ./modules/{module_name}/bin/env.properties
执行datax的python脚本地址:PYTHON_PATH=
保持和datax-admin服务的端口一致;默认是9527,如果没改datax-admin的端口,可以忽略:DATAX_ADMIN_PORT=
此文件中包括一些默认配置参数,例如:executor.port,json.path,data.path等,具体请查看文件。
启动服务bash bin/start-all.sh
可以通过start.sh m {moule_name}单一启动某一模块服务。
停止同理
jps查看是否出现DataXAdminApplication和DataXExecutorApplication进程
部署完成后,在浏览器中输入http://{ip}:{port}/index.html就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为datax-admin 指定的运行端口)
输入用户名 admin 密码 123456 就可以直接访问系统
还可在多设备上部署以维持负载均衡,防止单节点挂掉导致任务停摆。
通过Spring框架在windows运行
实现MySQL间数据同步。
注意需要在数据库连接后面添加如下字符编码规则参数,否则表中汉字不能识别
?useUnicode=true&characterEncoding=utf8
由于构建过程全程可以由web进行可视化操作,细节不再赘述,详见文末实例。
相比于全量同步每次都把整张表同步一次,增量同步只更新新增的部分,全量同步在某些情况下效率较低,例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。
DataX在数据增量同步方面比较欠缺,DataX-Web在这方面做了补足。
① 任务类型选DataX任务
② 辅助参数选择主键自增
③ 增量主键开始ID选择,即sql中查询ID的开始ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该ID被更新为上一次的任务触发时最大的ID,任务失败不更新。
④ 增量时间字段,-DstartId='%s' -DendId='%s' 先来解析下这段字符串
1、-D是DataX参数的标识符,必配2、-D后面的startId和endId是DataX json中where条件的id字段标识符,必须和json中的变量名称保持一致,endId是任务在每次执行时获取当前表maxId,也是下一次任务的startId3、='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致4、注意-DstartId='%s'和-DendId='%s' 中间有一个空格,空格必须保留并且是一个空格5、reader数据源,选择任务同步的读数据源6、配置reader数据源中需要同步数据的表名及该表的主键
将json中原本的colum去掉,改为querySql(关于querySql、querySql、preSql、postSql、splitPk等配置参数,详见datax(27):不太常见配置项querySql、preSql、postSql、splitPk[通俗易懂] - 全栈程序员必看)
此处的关键点在${startId},${endId},${}是DataX动态参数的固定格式,startId,endId就是我们页面配置中 -DstartId='%s' -DendId='%s'中的startId,endId,注意字段一定要一致。
由4.3.1中④可知,endId是任务在每次执行时获取当前表startId,也是下一次任务startId,每次任务,endId和startId的变化过程都是:
1、自定义初始startId
2、endId获取reader表中主键最大值
3、目标表更新主键介于startId,和endId之间的行
4、成功运行?startId=endId :startId=startId
5、循环进行2-4
如果运行中出现一次上述错误,例如导入重复数据而引发的主键唯一错误,那么从这个出错开始,后续的所有运行都会出错,虽然最终数据依然会完成同步,但每一次运行都相当于进行了一次全量同步,增量同步的设置将失去意义。
任务构建阶段填写报警邮件,在出现报错时及时发现并处理。
前提是表中有类似updatetime字段,可以为时间标准格式或者时间戳。
注意:
第一次时间增量时间要早于表的全部update_time,就是把时间戳传入json的lastTime和currentTime;
FROM_UNIXTIME是为了将表里面的标准格式转换为时间戳进行比较,如果表里默认为时间戳就不需要这个函数;
querySql中使用了select后需要把reader里面column,splitP需要删除掉 ;
“writeMode”: "update"相对insert会好一些;
对于写入时如果不提前创建分区就会报:
有如下解决方案:
1、修改DataX源码中hdfswriter下的hdfshelper.java,添加createPath方法:
- public boolean createPath(String filePath) {
-
- Path path = new Path(filePath);
-
- boolean exist = false;
-
- try {
-
- if (fileSystem.exists(path)) {
-
- String message = String.format("文件路径[%s]已存在,无需创建!",
-
- "message:filePath =" + filePath);
-
- LOG.info(message);
-
- exist = true;
-
- } else {
-
- exist = fileSystem.mkdirs(path);
-
- }
-
- } catch (IOException e) {
-
- String message = String.format("创建文件路径[%s]时发生网络IO异常,请检查您的网络是否正常!",
-
- "message:filePath =" + filePath);
-
- LOG.error(message);
-
- throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
-
- }
-
- return exist;
-
- }
2、修改hdfswriter目录下的HdfsWriter.java,在最后的else调用createPath并修改返回日志
3、将源码打包并替换datax/plugin/writer/hdfswriter/hdfswriter-0.0.1-SNAPSHOT.jar
4、测试
来源:
(二次开发DataX以支持HIVE分区表_datax二次开发_MaxineSgr的博客-CSDN博客)
官方文档中有解释,DataX只支持一次写入单个分区,可以说DataX对hive分区表非常不友好。
DataX-web将写入过程做了简化,但仍然无法做到灵活的创建分区和一次多个写入
任务的设置如下
如果按上图设置,最后的分区生成是这样的:
其中分区字段仅支持类似时间的字段,分区时间最多可以选到-20,即当前时间的前二十天。
json设置如下,${partition}是固定格式,不能更改
由最后创建的目录可以推断partition是由源码中输入的day与系统当前日期更改格式并计算天数后拼凑成的参数。目前网上没有教程但理论上可以自行修改这部分源码以让分区字段与日期选择更加灵活
注意由此动态分区自动生成路径的方法,写入后可能在Hive端无法查询数据,用MSCK REPAIR TABLE将没有写入metastore的分区信息写入metastore。
MSCK REPAIR TABLE hive_partition_test
运行结果:
在生产中,假设今日是一月二日,数据接收后存入一张日
1、Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、 Linux、 Unix上运行,绿色无需安装,数据抽取高效稳定。
2、Kettle 中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。
3、Kettle这个ETL工具集,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。
4、Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
5、Kettle(现在已经更名为PDI,Pentaho Data Integration-Pentaho)。
(1)Kettle下载安装非常简单,官网下载安装包,找下载数最多的下载解压即可
(Pentaho from Hitachi Vantara - Browse Files at SourceForge.net)
(2)将Mysql连接驱动置于pdi-ce\data-integration\lib下
(3)如果出现hive连接错误的情况,需要将hive lib下的jar包复制到pdi-ce\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30\lib
并将hive和hadoop的配置文件都复制到pdi-ce-8.3.0.0-371\data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\hdp30
具体教程可见(kettle连接Hive配置(一)-爱码网)
需要注意的是,官网下载的pdi-9.3版本不知为何缺少hive连接的相关配置文件,且目前缺少该版本的博客,可以下载8.3版本
但是kettle8.3版本不兼容mysql8以上版本的jdbc-connecter,若设备使用的是mysql8以上版本,有如下解决办法
1、数据库类型选择Generic Database,相当于不用kettle预设的类型,自定义连接
2、在kettle的安装目录下data-integration\simple-jndi\jdbc.properties加入jdbc的连接信息,相当于预设好一个连接,直接使用
然后在连接类型MySQL中选择预设好的JNDI
教程参考(kettle连接mysql8.0以上版本_北顾南望的博客-CSDN博客_kettle连接mysql8.0)
设置一个流程将两个MySQL表的内容输入,排序后根据,id将
可以看到并更改字段关系映射
一定要仔细定义数据同步中的高级标签,否则会报错“It was not possible to find operation field [null] in the input stream!”
mysql->mysql运行时长
hive->mysql运行时长
在“作业”中可以对编辑好的“转换”设计运行间隔以达到定时同步的功能。
作为阿里的开源项目,DataX的稳定性与可维护性非常有保障,网上有大量的测试与报错博客,更有补足功能、提供UI操作的DataX-web,使得DataX可以投入到项目中来。另:官方提供了对TDengine的reader和writer。
但DataX有缺陷,作为开源项目,某些功能官方维护不及时,针对某些急需实现的功能(如实例1、(3)中提到的writer过程中的null问题),网上有修改源码的教程。由于本次测试涉及面较窄,生产生活中还可能存在类似情况,到时候需要自主寻错、修改源码。
且DataX对Hive数据的读取可以看作对hdfs上存储的数据进行读取,只能是顺序的,且不能对列有细致操作,实际工作中有类似需要,只能先在数据库中操作,再进行同步作业。
DataX-web提供了一个方便快捷的向分区表写入的方法,但不能自由地多次地写入分区,只能一次写入一个分区,分区字段不能选择除时间以外的字段,时间的选择并不自由,只能选当前时间前20天内的一天作为分区。如果有相关需求,可以进行源码的二次开发,建议二次开发时一定做好记录,提高部门同事间沟通效率,以提高工作效率。
虽有不足但DataX总体上能承担日常生活中的数据同步工作。DataX官方插件有严格的格式设置,编写json时请先查看相关插件的参数介绍。
Kettle提供了强大的图形化Spoon,可以做到中小型需求不写一行代码,大大减少了工作量,且对于Hive数据的读取可以做列的操作,数据的读取是依靠字段进行的而不是顺序进行的,比DataX、Sqoop更加精细。kettle重点是中间变换工作,集成功能速度较慢
时间对比,上图为datax,下图为kettle,可以看到差距还是比较大的
MySQL->MySQL:
Hive->MySQL:
可以看到在同一台机器上运行同一份数据时,在MySQL->MySQL和Hive->MySQL的情况下,仅29万条数据,Kettle和DataX在效率上就存在明显差距。随着数据量的提升,差距会愈发明显。因此若选用Kettle则需要考虑在海量数据的前提下的调优问题,业务越复杂,需要优化的组件就越多。
且Kettle spoon对于定时运行缺乏集成化的设置,一旦业务数量变多,执行定时调度时,就只能通过系统自带的定时任务调度去进行管理。无法统一,假如要做统一的管理,需要安装一套jenkins,但配置和后续的运维成本可能较高。以及kettle的内存占用较高,无法最大效率地利用服务器资源
注意:
(1)hive作为数据源的时候,注意在reader设置中不要跳过表头,否则会缺失第一列数据
(2)单机版的DataX,在脏数据统计上有点小问题。举个例子:先运行了一个任务A,假设这个任务A有5条脏数据,errorlimit设置了record:0,在运行时,这个任务是一定会因为脏数据而终止执行。任务A终止执行后,你紧接着运行任务B,假设任务B没有脏数据,errorlimit也设置了record:0,但有可能任务B在运行时,它也会报“在运行的过程中捕获了5条脏数据,任务结束”。也就是任务A的脏数据会影响任务B。问题出在LocalTGCommunicationManager这个类中,它使用了一个静态变量taskGroupCommunicationMap来存储脏数据。
(1)(问题可能出现在hdfsreader等)
2022-12-15 13:59:56.077 [0-0-0-reader] ERROR StdoutPluginCollector - 脏数据:
{"message":"No enum constant com.alibaba.datax.plugin.unstructuredstorage.reader. UnstructuredStorageReaderUtil.Type.BIGINT","record":[],"type":"reader"}
问题原因:datax支持的数据类型与hive有不同
DataX 数据类型 | Hive表 数据类型 |
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY |
Boolean | BOOLEAN |
Date | Date,TIMESTAMP |
其余多种数据库间数据类型映射,可参考(datax与多种数据库间数据类型映射_datax 数据类型_chimchim66的博客-CSDN博客)
将reader中所有其余数据库的数据类型依次按照对照表改为datax支持的数据类型。
(2)(问题可能出现在hdfsreader等)
在read过程中,\N不能转换为DATE属性的值,报类型转换错误
1、在reader中设置,使空值不再参与转换,直接跳过
2、设中转表将DATE类型改为STING类型,再由数据库或olap组件内部转换格式。
(3)(问题可能出现在将数据同步后的Hive又作为数据源输出时)
类型转换错误,无法将[]转换为[date]等等一系列类似错误
hdfswriter并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,hdfswriter会将null值存储为空字符串(’’),但Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
1、修改datahdfswriter的源码,增加自定义null值存储格式的逻辑,参考(记Datax3.0解决MySQL抽数到HDFSNULL变为空字符的问题_谭正强的博客-CSDN博客_datax nullformat),然后mvn打包。在hdfswriter阶段就把null值划分清楚,避免后续问题积少成多。
在hive表同步到hive表的过程中reader和writer都要设置nullFormat参数。
2、或是在Hive中建表时指定null值存储格式为空字符串(’’)(不推荐)
例如:
DROP TABLE IF EXISTS base_province;
CREATE EXTERNAL TABLE base_province
(
`id` STRING COMMENT '编号',
`name` STRING COMMENT '省份名称',
`region_id` STRING COMMENT '地区ID',
`area_code` STRING COMMENT '地区编码',
`iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用',
`iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用'
) COMMENT '省份表'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
NULL DEFINED AS ''
LOCATION '/base_province/';
(4)(问题可能出现在hdfsreader、hdfswriter等)
在同步过程中,出现列数据混乱,或日志出现类型转换错误
由于datax对于hdfs读写非常格式化,如果建表的列顺序不对或列分隔符出现在表中的话,都会引起数据读写的混乱的情况。
实际建表中建议hive列分隔符不要出现任何表中可能有的元素,例如“\t”,“,”“;”等。创建列的先后顺序严格一致,以减少同步过程中数据混乱。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。