当前位置:   article > 正文

DataX 同步mysql数据到hive_win11 datax 将数据迁移到hive

win11 datax 将数据迁移到hive

安装配置datax

准备工作

  • 创建mysql数据表
CREATE TABLE `part2hdfs` (
  `part_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `CREATE_TIME` int(11) DEFAULT NULL,
  `LAST_ACCESS_TIME` int(11) DEFAULT NULL,
  `PART_NAME` varchar(767) DEFAULT NULL,
  `SD_ID` bigint(20) DEFAULT NULL,
  `TBL_ID` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`part_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1114096 DEFAULT CHARSET=utf8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 创建hive表
createtab_stmt
CREATE TABLE `part2hdfs`(
  `part_id` int, 
  `create_time` int, 
  `last_access_time` int, 
  `part_name` string, 
  `sd_id` int, 
  `tbl_id` int)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'='\t', 
  'serialization.format'='\t') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://wdp-offline/data/warehouse/dw/part2hdfs'
TBLPROPERTIES (
  'numFiles'='2', 
  'numRows'='0', 
  'rawDataSize'='0', 
  'totalSize'='14673398', 
  'transient_lastDdlTime'='1622112497')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 查看hive数据表地址
hadoop fs  -ls /data/warehouse/dw/part2hdfs

  • 1
  • 2

配置mysql数据源和目标端 hive

  • vim conf/table2hdfs.json
{
  "job": {
    "setting": {
      "speed": {
        "channel":1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "mysql用户名",
            "password": "mysql密码",
         #同步的mysql数据表列名集合   
	    "column":["PART_ID","CREATE_TIME","LAST_ACCESS_TIME","PART_NAME","SD_ID","TBL_ID"],
	    # 根据某个表的字段进行划分同步任务,提高并行度
	    "splitPk": "PART_ID",
            "connection": [
              {
                # mysql表
                "table":["part2hdfs"],
                "jdbcUrl": [
                   # jdbc链接地址
                  "jdbc:mysql://mysql-host:mysql-host/database_name"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://haoop:8020",
            "fileType": "ORC",
            "path": "/data/warehouse/dw/part2hdfs",
             # hive数据表
            "fileName": "part2hdfs",
	    "column":[
		     {
		      "name":"PART_ID",
		      "type":"INT"
		     },
		     {
		      "name":"CREATE_TIME",
		      "type":"INT"
		     },
		    {
		      "name":"LAST_ACCESS_TIME",
		      "type":"INT"
		    },
		    {
		      "name":"PART_NAME",
		      "type":"STRING"
		    },
		    {
		      "name":"SD_ID",
		      "type":"INT"
		    },
		    {
		      "name":"TBL_ID",
		      "type":"INT"
		    }
	  	    ],
            "writeMode": "append",
            "fieldDelimiter": "\t",
            "compress": "SNAPPY"
          }
         }
        }
    ]
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 同步任务
 python bin/datax.py conf/table2hdfs.json 
  • 1

传递动态参数

  • 动态修改SQL语句来读取mysql数据表,并且动态修改hbase表名进行数据写入

  • 配置文件

{
    "job": {
        "setting": {
          "speed": {
            "byte": 8388608,
            "channel": 3
          },
          "errorLimit": {
            "record": 0,
            "percentage": 0.02
          }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxx",
                        "password": "xxx",                                 
                        "connection": [
                            {
                                "querySql": ["$DS"],
                                "jdbcUrl": ["jdbc:mysql://xxx:3306/xxx"]
                            }
                        ]
                    }                   
                },
                "writer": {
                    "name": "hbase11xwriter",
                    "parameter": {              
                        "hbaseConfig": {
                          "hbase.zookeeper.quorum": "xxx"
                        },
                        "table": "$DT",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
							"index":0,
							"type":"string"
							},
							{
							"index":-1,
							"type":"string",
							"value":"-"
							},
							{
							"index":1,
							"type":"string"
							}               
                        ],
                        "column" : [ {
                                "index":1,
                                "name": "data:studentName",
                                "type": "string"
                              }
                        ],
                        "encoding": "utf-8"
                    }
                }
            }
        ]
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 执行同步
$ python ../bin/datax.py -p "-DDS='select xx,xx from xx where xx = xx' -DDT=student" mysql2Hbase-test.json

python bin/datax.py  -p "-DDS='select xx,xx from xx where xx = xx' -DDT=student" conf/mysql2Hbase-test.json
  • 1
  • 2
  • 3

优化

  • 提高并发度
1.增加执行json文件的channel个数
setting": {

      "speed": {

        "channel":1

      }

    }
2.增加core.json任务组个数
taskGroup": {

                "channel": 5

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

文档使用

  • mysql2hbase https://www.it610.com/article/1294182090094485504.htm
  • https://github.com/alibaba/DataX/blob/master/userGuid.md
  • https://codechina.csdn.net/mirrors/alibaba/datax/-/tree/master/oraclereader
  • https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
  • https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/141120
推荐阅读
相关标签
  

闽ICP备14008679号