赞
踩
null值 在hive和mysql里的存储格式不一样,需要告诉DataX应该如何转换。
12.1.1 创建数据库
CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段顺序也要一致
每张表要有主键
1)各活动补贴率
dt activity_id activity_name 三个主键联合而成
DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats` (
`dt` date NOT NULL COMMENT '统计日期',
`activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
`activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
`start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
`reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;
DataX配置文件生成脚本
方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。
1)在~/bin目录下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下
# coding=utf-8 import json import getopt import os import sys import MySQLdb #MySQL相关配置,需根据实际情况作出修改 mysql_host = "hadoop102" mysql_port = "3306" mysql_user = "root" mysql_passwd = "000000" #HDFS NameNode相关配置,需根据实际情况作出修改 hdfs_nn_host = "hadoop102" hdfs_nn_port = "8020" #生成配置文件的目标路径,可根据实际情况作出修改 output_path = "/opt/module/datax/job/export" def get_connection(): return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd) def get_mysql_meta(database, table): connection = get_connection() cursor = connection.cursor() sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION" cursor.execute(sql, [database, table]) fetchall = cursor.fetchall() cursor.close() connection.close() return fetchall def get_mysql_columns(database, table): return map(lambda x: x[0], get_mysql_meta(database, table)) def generate_json(target_database, target_table): job = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [{ "reader": { "name": "hdfsreader", "parameter": { "path": "${exportdir}", "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port, "column": ["*"], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "\t", "nullFormat": "\\N" } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "replace", "username": mysql_user, "password": mysql_passwd, "column": get_mysql_columns(target_database, target_table), "connection": [ { "jdbcUrl": "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8", "table": [target_table] } ] } } }] } } if not os.path.exists(output_path): os.makedirs(output_path) with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f: json.dump(job, f) def main(args): target_database = "" target_table = "" options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl=']) for opt_name, opt_value in options: if opt_name in ('-d', '--targetdb'): target_database = opt_value if opt_name in ('-t', '--targettbl'): target_table = opt_value generate_json(target_database, target_table) if __name__ == '__main__': main(sys.argv[1:])
在~/bin目录下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。
#!/bin/bash python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats
3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,生成配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察生成的配置文件
[atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/
编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目录下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容
#! /bin/bash DATAX_HOME=/opt/module/datax #DataX导出路径不允许存在空文件,该函数作用为清理空文件 handle_export_path(){ target_file=$1 for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; do hadoop fs -test -z $i if [[ $? -eq 0 ]]; then echo "$i文件大小为0,正在删除" hadoop fs -rm -r -f $i fi done } #数据导出 export_data() { datax_config=$1 export_dir=$2 hadoop fs -test -e $export_dir if [[ $? -eq 0 ]] then handle_export_path $export_dir file_count=$(hadoop fs -ls $export_dir | wc -l) if [ $file_count -gt 0 ] then set -e; $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config set +e; else echo "$export_dir 目录为空,跳过~" fi else echo "路径 $export_dir 不存在,跳过~" fi } case $1 in "ads_new_buyer_stats") export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats ;; "ads_order_by_province") export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province ;; "ads_page_path") export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path ;; "ads_repeat_purchase_by_tm") export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm ;; "ads_trade_stats") export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats ;; "ads_trade_stats_by_cate") export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate ;; "ads_trade_stats_by_tm") export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm ;; "ads_traffic_stats_by_channel") export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel ;; "ads_user_action") export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action ;; "ads_user_change") export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change ;; "ads_user_retention") export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention ;; "ads_user_stats") export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats ;; "ads_activity_stats") export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats ;; "ads_coupon_stats") export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats ;; "ads_sku_cart_num_top3_by_cate") export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate ;; "all") export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate ;; esac
(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。