赞
踩
CREATE TABLE `part2hdfs` (
`part_id` bigint(20) NOT NULL AUTO_INCREMENT,
`CREATE_TIME` int(11) DEFAULT NULL,
`LAST_ACCESS_TIME` int(11) DEFAULT NULL,
`PART_NAME` varchar(767) DEFAULT NULL,
`SD_ID` bigint(20) DEFAULT NULL,
`TBL_ID` bigint(20) DEFAULT NULL,
PRIMARY KEY (`part_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1114096 DEFAULT CHARSET=utf8
createtab_stmt CREATE TABLE `part2hdfs`( `part_id` int, `create_time` int, `last_access_time` int, `part_name` string, `sd_id` int, `tbl_id` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ( 'field.delim'='\t', 'serialization.format'='\t') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs://wdp-offline/data/warehouse/dw/part2hdfs' TBLPROPERTIES ( 'numFiles'='2', 'numRows'='0', 'rawDataSize'='0', 'totalSize'='14673398', 'transient_lastDdlTime'='1622112497')
hadoop fs -ls /data/warehouse/dw/part2hdfs
{ "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "mysql用户名", "password": "mysql密码", #同步的mysql数据表列名集合 "column":["PART_ID","CREATE_TIME","LAST_ACCESS_TIME","PART_NAME","SD_ID","TBL_ID"], # 根据某个表的字段进行划分同步任务,提高并行度 "splitPk": "PART_ID", "connection": [ { # mysql表 "table":["part2hdfs"], "jdbcUrl": [ # jdbc链接地址 "jdbc:mysql://mysql-host:mysql-host/database_name" ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://haoop:8020", "fileType": "ORC", "path": "/data/warehouse/dw/part2hdfs", # hive数据表 "fileName": "part2hdfs", "column":[ { "name":"PART_ID", "type":"INT" }, { "name":"CREATE_TIME", "type":"INT" }, { "name":"LAST_ACCESS_TIME", "type":"INT" }, { "name":"PART_NAME", "type":"STRING" }, { "name":"SD_ID", "type":"INT" }, { "name":"TBL_ID", "type":"INT" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY" } } } ] } }
python bin/datax.py conf/table2hdfs.json
动态修改SQL语句来读取mysql数据表,并且动态修改hbase表名进行数据写入
配置文件
{ "job": { "setting": { "speed": { "byte": 8388608, "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "xxx", "password": "xxx", "connection": [ { "querySql": ["$DS"], "jdbcUrl": ["jdbc:mysql://xxx:3306/xxx"] } ] } }, "writer": { "name": "hbase11xwriter", "parameter": { "hbaseConfig": { "hbase.zookeeper.quorum": "xxx" }, "table": "$DT", "mode": "normal", "rowkeyColumn": [ { "index":0, "type":"string" }, { "index":-1, "type":"string", "value":"-" }, { "index":1, "type":"string" } ], "column" : [ { "index":1, "name": "data:studentName", "type": "string" } ], "encoding": "utf-8" } } } ] } }
$ python ../bin/datax.py -p "-DDS='select xx,xx from xx where xx = xx' -DDT=student" mysql2Hbase-test.json
python bin/datax.py -p "-DDS='select xx,xx from xx where xx = xx' -DDT=student" conf/mysql2Hbase-test.json
1.增加执行json文件的channel个数 setting": { "speed": { "channel":1 } } 2.增加core.json任务组个数 taskGroup": { "channel": 5 }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。