赞
踩
【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive
本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。
create table t_order(
id int primary key auto_increment,
amt decimal(10,2),
`status` int default 0,
user_id int,
create_time timestamp DEFAULT CURRENT_TIMESTAMP,
modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
create table t_order(
id int,
amt decimal(10,2),
`status` int,
user_id int,
create_time date,
modify_time date
)partitioned by (dt string)
row format delimited
fields terminated by '\t'
注意字段时间戳,我们将从以上MySQL向Hive导入数据。
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"], "querySql": [ "select id,amt,status,user_id,create_time,modify_time from t_order" ] } ], "password": "0000", "username": "root", } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ {"name": "id","type": "int"}, {"name": "amt","type": "double"}, {"name": "status","type": "int"}, {"name": "user_id","type": "int"}, {"name": "create_time","type": "string"}, {"name": "modify_time","type": "string"} ], "defaultFS": "hdfs://hadoop10:8020", "fieldDelimiter": "\t", "fileName": "t_order", "fileType": "text", "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区
insert into t_order(amt,user_id) values(100,1001)
insert into t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
insert into t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
在hive下创建分区
alter table t_order add partition(dt='2023-07-11')
运行dataX脚本
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json
此部分的操作是将先插入mysql的三条数据导入到hive。
在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区
insert into t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
insert into t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
在hive下创建分区
alter table t_order add partition(dt='2023-07-12')
运行datax脚本
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json
此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。
根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。
大方向
:事实表用增量[订单表] 维度表用全量[商品表]
绝大部分公司采用的方案:全量为主、增量为辅
要想采用增量导入还有一个问题是你的业务库表能够支持增量导入
根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。
如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。
在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"], "querySql": [ "select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'" ] } ], "password": "0000", "username": "root", } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ {"name": "id","type": "int"}, {"name": "amt","type": "double"}, {"name": "status","type": "int"}, {"name": "user_id","type": "int"}, {"name": "create_time","type": "string"}, {"name": "modify_time","type": "string"} ], "defaultFS": "hdfs://hadoop10:8020", "fieldDelimiter": "\t", "fileName": "t_order", "fileType": "text", "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt", "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } }
运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。
create table t_student( id int PRIMARY key, name varchar(50), `age` int ); create table t_person( id int PRIMARY key, name varchar(50), parentid int ); INSERT into t_student values (1,'zhanmusi',15), (2,'lisi',55), (3,'lisi',66); INSERT into t_person values (1,'miky',06), (2,'tom',16), (3,'jakcon',26);
create table ods_t_student(
id int,
name string,
`age` int
)partitioned by (dt string)
row format delimited
fields terminated by '\t'
create table ods_t_person(
id int,
name string,
parentid int
)partitioned by (dt string)
row format delimited
fields terminated by '\t'
import json import sys import pymysql def gen_json(dbname, tablename): s1 = { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": ["jdbc:mysql://hadoop10:3306/" + dbname + "?useSSL=false"], "table": [tablename] } ], "password": "0000", # 密码 "username": "root", "column": getColumn(dbname, tablename) } }, "writer": { "name": "hdfswriter", "parameter": { "column": getColumnAndType(dbname, tablename), "defaultFS": "hdfs://hadoop10:8020", # hdfs端口 "fileType": "text", "path": "/user/hive/warehouse/ods_" + tablename + "/dt=$dt", "fieldDelimiter": "\t", "fileName": tablename, "writeMode": "append" } } } ], "setting": { "speed": { "channel": "1" } } } } with open('d:/test/' + tablename + '.json', 'w') as f: json.dump(s1, f) def queryDataBase(dbname, tablename): conn = pymysql.connect(user='root', password='0000', host='hadoop10') # 密码 cursor = conn.cursor() cursor.execute( "select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position", [dbname, tablename]) fetchall = cursor.fetchall() cursor.close() conn.close() return fetchall def getColumn(dbname, tablename): k1 = queryDataBase(dbname, tablename) k2 = list(map(lambda x: x[0], k1)) return k2 def getColumnAndType(dbname, tablename): k1 = queryDataBase(dbname, tablename) mappings = { 'bigint': 'bigint', 'varchar': 'string', 'int': 'int', 'datetime': 'string', 'text': 'string' } k2 = list(map(lambda x: {"name": x[0], "type": mappings[x[1].lower()]}, k1)) return k2 if __name__ == '__main__': l = sys.argv[1:] dbname = l[0] # mysql数据库名 tablename = l[1] # 表名 gen_json(dbname, tablename)
(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student
(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]
#! /bin/bash dt=$1 if [ ''$1 == '' ] then dt=$(date -d yesterday +%Y-%m-%d) fi echo $dt s=$(hive -e "show partitions ods_t_student partition(dt='$dt')") echo === $s ==== if [ "$s" == "partition" ] then hive -e "alter table ods_t_student add partition(dt='$dt')" else echo "$dt分区已经存在" fi python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_student.json s=$(hive -e "show partitions ods_t_person partition(dt='$dt')") echo === $s ==== if [ "$s" == "partition" ] then hive -e "alter table ods_t_person add partition(dt='$dt')" else echo "$dt分区已经存在" fi python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_person.json
root@hadoop10 app]# sh b.sh 2023-07-13
任务启动时刻 : 2023-07-13 02:31:38
任务结束时刻 : 2023-07-13 02:31:50
任务总计耗时 : 12s
任务平均流量 : 2B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
id|name |age|dt |
--|--------|---|----------|
1|zhanmusi| 15|2023-07-13|
2|lisi | 55|2023-07-13|
3|lisi | 66|2023-07-13|
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。