赞
踩
目录
9.3.1. 在Hive中建表的时候指定表( NULL DEFINED AS '' )
DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能
源码地址: 点击进入
组件地址: 点击下载
类型 | 数据源 |
关系型数据库 | MySql |
Oracle | |
SQLServer | |
PostgreSQL | |
NoSql数据存储 | HBase 0.94 / 1.1 |
Phoenix 4.x / 5.x | |
MongoDB | |
Hive | |
无结构化数据存储 | TxtFile |
FTP | |
HDFS | |
ElasticSearch 支持读不支持写 |
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
对于使用者,只需要学习DataX的数据源配置方式就可以将数据源里面的数据进行传输
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中
数据采集模块,负责采集数据源的数据,将数据发送给Framework
数据写入模块,负责不断向Framework取数据, 并将数据写入到目的端
用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,控流,并发,数据转换等核心技术问题
DataX采集全量数据的原理是通过sql查询的方式获取数据,查询语句会被切割,例如 通过时间查询,按照时间维度将数据切分成多个Task, DataX在传输数据的时候将启动TaskGroup,每个TaskGroup负责一定的并发度运行其所得的Task,单个TaskGroup的并发的固定为5,总TaskGroup数量与配置的总并发度有关: TaskGroup数量 = 总并发度 / 5
功能 | DataX | Sqoop |
运行模式 | 单进程多线程 | MR |
分布式 | 不支持,可以通过调度系统规避 | 支持 |
控流 | 有 | 需要定制开发 |
统计信息 | 已有一些统计,上报需要定制 | 没有, 分布式数据数据不方便 |
数据校验 | 在core部分与校验功能 | 没有, 分布式收集数据不方便 |
监控 | 需要定制 | 需要定制 |
下载DataX安装包到服务器解压并测试运行
python <datax_home>/bin/datax.py <datax_home>/job/job.json
可以使用如下命名查看DataX配置文件模板
python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地
Reader和Writer的具体参数可参考官方文档,点击查看
- {
- "job": {
- "content": [{
- "reader": {},
- "writer": {}
- }],
- "setting": {
- "speed": {
- "channel": 1
- }
- }
- }
- }
- {
- "name": "mysqlreader", //Reader名称 , 固定写法
- "parameter": {
- "username": "root", //数据库用户密码
- "password": "123456",
- "connection": [{
- "jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
- "table": ["tb"] //数据库jdbc url
- }],
- "column": ["id", "name", "age"], //同步的字段 ["*"]表示所有字段
- "where": "id>=0", //while过滤条件
- "splitPk": "" //分片字段,如果没有这个字段,或者值为空,则只有一个Task
- }
- }
- {
- "name": "mysqlreader", //Reader名称 , 固定写法
- "parameter": {
- "username": "root", //数据库用户密码
- "password": "123456",
- "connection": [{
- "jdbcUrl": ["jdbc:mysql://node1:3306/gmall"],//数据库jdbc url
- "table": ["select * from tb"] //数据库jdbc url
- }]
- }
- }
- {
- "name": "hdfswriter", //Writer名称 , 固定写法
- "parameter": {
- "column": [{ //列信息,包括列名和类型 类型为Hive表字段类型,目前不支持decimal,binary,arrays,maps,struicts
- "name": "id",
- "type": "bigint"
- }, {
- "name": "name",
- "type": "string"
- }, {
- "name": "age",
- "type": "bigint"
- }],
- "defaultFS": "hdfs://node2:8020", //HDFS文件系统namenode节点地址,不支持传HA的集群名
- "path": "/mydatax", //HDFS文件系统目标路径
- "fileName": "tb", //HDFS文件名前缀
- "fileType": "text", //HDFS文件类型
- "compress": "gzip", //HDFS压缩类型 text文件支持压缩gzip bzip2; orc文件支持压缩NONE SNAPPY
- "fieldDelimiter": "\t", //HDFS的分隔符
- "writeMode": "append" //数据写入的模式 append 追加 ; nonConflict: 若写入目录有同名(前缀相同文件),报错
- }
- }
注意:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决方法有两种,任意一种都可以
- {
- "speed": { //传输速度配置
- "channel": 1 //并发数
- },
- "errorLimit": { // 容错比例配置
- "record": 1, //错误条数上限,超出则任务失败
- "percentage": 0.02 //错误比例上限,超出则任务失败
- }
- }
- {
- "job": {
- "content": [{
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "root",
- "password": "123456",
- "connection": [{
- "jdbcUrl": ["jdbc:mysql://node1:3306/yangxp"],
- "table": ["tb"]
- }],
- "column": ["id", "name", "age"],
- "where": "id>=0",
- "splitPk": ""
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [{
- "name": "id",
- "type": "bigint"
- }, {
- "name": "name",
- "type": "string"
- }, {
- "name": "age",
- "type": "bigint"
- }],
- "defaultFS": "hdfs://node2:8020",
- "path": "/mydatax",
- "fileName": "tb",
- "fileType": "text",
- "compress": "gzip",
- "fieldDelimiter": "\t",
- "writeMode": "append"
- }
- }
- }],
- "setting": {
- "speed": {
- "channel": 1
- },
- "errorLimit": {
- "record": 1,
- "percentage": 0.02
- }
- }
- }
- }
- DROP TABLE IF EXISTS tb;
- CREATE EXTERNAL TABLE tb
- (
- `id` STRING COMMENT '编号',
- `name` STRING COMMENT '姓名',
- `age` STRING COMMENT '年龄'
- ) COMMENT '年龄表'
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- NULL DEFINED AS ''
- LOCATION '/mydatax/';
将配置内容写入mysql_to_hive.json配置文件到目录<datax_home>/job/下并启动任务
python <datax_home>/bin/datax.py <datax_home>/job/mysql_to_hive.json
数据被压缩后上传到Hive,直接在看会乱码
可以借助hive客户端查看表数据,或者使用hdfs的zcat查看
hdfs dfs -cat /mydatax/tb__af7eac41_a69e_4976_bb68_a98cd5d3a689.gz|zcat
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。
- {
- "name": "hdfswriter",
- "parameter": {
- ...
- "path": "/mydatax/${dt}",
- ...
- }
- }
python <datax_home>/bin/datax.py -p"-Ddt=2023-03-04" <datax_home>/job/mysql_to_hdfs.json
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
参数 | 说明 |
job.setting.speed.channel | 并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
计算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
配置示例:
- {
- "core": {
- "transport": {
- "channel": {
- "speed": {
- "byte": 1048576 //单个channel byte限速1M/s
- }
- }
- }
- },
- "job": {
- "setting": {
- "speed": {
- "byte" : 5242880 //总byte限速5M/s
- }
- },
- ...
- }
- }
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python <datax_home>/bin/datax.py --jvm="-Xms8G -Xmx8G" <datax_home>/job/mysql_to_hdfs..json
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。