当前位置:   article > 正文

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive_datax mysql hive

datax mysql hive

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

一、mysql全量导入hive[分区表]

需求介绍:

本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。


  • mysql表建表语句:
create table t_order(
	id   	 	int   primary key auto_increment,
	amt  	 	decimal(10,2),
	`status` 	int  default 0,
	user_id  	int,
	create_time timestamp DEFAULT CURRENT_TIMESTAMP,
	modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • hive
create table t_order(
	id   	 	int,
	amt  	 	decimal(10,2),
	`status` 	int,
	user_id  	int,
	create_time date,
	modify_time date
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

注意字段时间戳,我们将从以上MySQL向Hive导入数据。

  • 编写datax的json脚本
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 执行导入操作

在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区

insert into t_order(amt,user_id) values(100,1001)
insert into t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
insert into t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
  • 1
  • 2
  • 3

在hive下创建分区

alter table t_order add partition(dt='2023-07-11')
  • 1

运行dataX脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json
  • 1

此部分的操作是将先插入mysql的三条数据导入到hive。


在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区

insert into t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
insert into t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
  • 1
  • 2

在hive下创建分区

alter table t_order add partition(dt='2023-07-12')
  • 1

运行datax脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json
  • 1

此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。
根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。

二、mysql增量导入hive

大方向:事实表用增量[订单表] 维度表用全量[商品表]

绝大部分公司采用的方案:全量为主、增量为辅

要想采用增量导入还有一个问题是你的业务库表能够支持增量导入

1. 增量导入的第一种实现方法

根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。
如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。

2. 另一种方法是 时间字段

在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive

3. dataX脚本

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。

三、利用Python自动生成Datax的json脚本

1. 创建mysql和hive数据库

create table t_student(
	id   	 int PRIMARY key,
	name  	 	varchar(50),
	`age` 		int
);

create table t_person(
	id   	 		int PRIMARY key,
	name  	 	varchar(50),
	parentid 		int
);

INSERT into t_student values
(1,'zhanmusi',15),
(2,'lisi',55),
(3,'lisi',66);

INSERT into t_person values
(1,'miky',06),
(2,'tom',16),
(3,'jakcon',26);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
create table ods_t_student(
	id   	 	int,
	name  	 	string,
	`age` 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'

create table ods_t_person(
	id   	 		int,
	name  	 		string,
	parentid 		int
)partitioned by (dt string)
row format delimited 
fields terminated by '\t'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2. 修改python脚本里面的密码(2处)和hdfs端口

import json
import sys
import pymysql


def gen_json(dbname, tablename):
 s1 = {
     "job": {
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "connection": [
                             {
                                 "jdbcUrl": ["jdbc:mysql://hadoop10:3306/" + dbname + "?useSSL=false"],
                                 "table": [tablename]
                             }
                         ],
                         "password": "0000",  # 密码
                         "username": "root",
                         "column": getColumn(dbname, tablename)
                     }
                 },
                 "writer": {
                     "name": "hdfswriter",
                     "parameter": {
                         "column": getColumnAndType(dbname, tablename),
                         "defaultFS": "hdfs://hadoop10:8020",  # hdfs端口
                         "fileType": "text",
                         "path": "/user/hive/warehouse/ods_" + tablename + "/dt=$dt",
                         "fieldDelimiter": "\t",
                         "fileName": tablename,
                         "writeMode": "append"
                     }
                 }
             }
         ],
         "setting": {
             "speed": {
                 "channel": "1"
             }
         }
     }
 }

 with open('d:/test/' + tablename + '.json', 'w') as f:
     json.dump(s1, f)


def queryDataBase(dbname, tablename):
 conn = pymysql.connect(user='root', password='0000', host='hadoop10')  # 密码
 cursor = conn.cursor()
 cursor.execute(
     "select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",
     [dbname, tablename])
 fetchall = cursor.fetchall()
 cursor.close()
 conn.close()
 return fetchall


def getColumn(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 k2 = list(map(lambda x: x[0], k1))
 return k2


def getColumnAndType(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 mappings = {
     'bigint': 'bigint',
     'varchar': 'string',
     'int': 'int',
     'datetime': 'string',
     'text': 'string'
 }
 k2 = list(map(lambda x: {"name": x[0], "type": mappings[x[1].lower()]}, k1))
 return k2


if __name__ == '__main__':
 l = sys.argv[1:]
 dbname = l[0]  # mysql数据库名
 tablename = l[1]  # 表名
 gen_json(dbname, tablename)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

3. 运行python脚本

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person

  • 1
  • 2
  • 3
  • 4

4. 将生成的json文件上传到linux

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]

5. 编写shell脚本 b.sh

#! /bin/bash

dt=$1

if [ ''$1 == '' ]
then
  dt=$(date -d yesterday +%Y-%m-%d)
fi

echo $dt

s=$(hive -e "show partitions ods_t_student partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_student add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_student.json



s=$(hive -e "show partitions ods_t_person partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_person add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_person.json

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

6. 运行shell

root@hadoop10 app]# sh b.sh 2023-07-13

任务启动时刻                    : 2023-07-13 02:31:38
任务结束时刻                    : 2023-07-13 02:31:50
任务总计耗时                    :                 12s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • hive
id|name    |age|dt        |
--|--------|---|----------|
 1|zhanmusi| 15|2023-07-13|
 2|lisi    | 55|2023-07-13|
 3|lisi    | 66|2023-07-13|
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/141152
推荐阅读
相关标签
  

闽ICP备14008679号