赞
踩
我们的业务数据基本都是在数据库中,如果需要离线同步到hdfs我们就需要使用dataX工具。使用dataX只需要学好json脚本,配置好数据源和路径就可以了。以下是我的一个mysql同步到HIve,以上的变量都可以通过传参统一一个脚本处理。
{ "job": { "setting": { "speed": { "channel": 3 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "${username}", "password": "${password}", "connection": [ { "jdbcUrl": [ "${jdbcUrl}" ], "querySql": [ "select id,create_time,update_time from ${sourceTableName} where update_time<'${endTime}' " ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "string" }, { "name": "create_time", "type": "string" }, { "name": "update_time", "type": "string" } ], "isCompress": "${isCompress}", "defaultFS": "${hdfsPath}", "fieldDelimiter": "${fieldDelimiter}", "fileName": "${fileName}", "fileType": "${fileType}", "path": "${path}", "writeMode": "${writeMode}" } } } ] } }
很多人在问如何使用脚本变量如何传参。
首先我们要会使用dataX如何调用这个脚本。
python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/day=$DT_TIME" $DATAX_JSON_FILE;
在这个命令中你会发现将所有的变量都通过shell命令传递进去了。后续的这些变量传递我在更新。之所以这么多变量其主要是为了方便后续的脚本更新和调度运行。
对于开发人员只需要关心主要逻辑就行了。
有了这个基础脚本,我们就可以将HIVE上的一些功能一起合并到shell脚本中:
基于上面的考虑。封装dataX的调用脚本。
#!/bin/bash source /etc/profile DATAX_HOME="/home/data/datax" SHELL_PATH="/home/data/dw_datax" SCRIPT_PATH=${SHELL_PATH}/job DATAX_LOG=${SHELL_PATH}/logs/datax.log.`date "+%Y-%m-%d"` HDFS_PATH="hdfs://hdfs-cluster" #START_TIME=$(date -d "-1 day" +%Y-%m-%d) #END_TIME=$(date "+%Y-%m-%d") #DT_TIME=$(date -d "-1 day" +%Y%m%d) START_TIME="" END_TIME="" DT_TIME="" #失效日期 INVALID_DATE="" #失效天数 INVALID_DAYS=180 #是否清除失效数据:默认清除 IS_CLEAR_INVALID_DATA=1 #参数 ISCOMPRESS="false" WRITEMODE="nonConflict" FIELDDELIMITER="|" FILETYPE="orc" PATH_HIVE="/user/hive/warehouse/" MYSQL_URL="" #数据库用户名 MYSQL_USERNAME="admin" #数据库密码 MYSQL_PASSWD="123456" #默认同步目标库名 TARGET_DB_NAME="ods" #同步源库名 SOURCE_DB_NAME="" #同步源表名 SOURCE_TABLE_NAME="" #业务名称 BUSINESS_NAME="" #datax json文件 DATAX_JSON_FILE=/temp # 数据库实例信息 declare -A db_instance_conf # 数据库用户名 declare -A db_instance_user_conf # 数据库密码 declare -A db_instance_pwd_conf # 数据库实例与库映射关系 declare -A db_instance_maps # 初始化数据库实例配置 function initInstanceConf() { # 主业务线 ywx1 db_instance_conf["db_main_data"]="jdbc:mysql://192.168.1.1:3306/" db_instance_user_conf["db_main_data"]="admin" db_instance_pwd_conf["db_main_data"]="123456" # 业务线2 ywx2 db_instance_conf["db_data"]="jdbc:mysql://192.168.1.2:3306/" db_instance_user_conf["db_data"]="admin" db_instance_pwd_conf["db_data"]="123456" ... } # 初始化库和数据库实例映射关系 function initDbAndInstanceMaps() { #主业务线 db_instance_maps["ywx1_db_main"]="db_main_data" #业务线2 db_instance_maps["ywx2_db_data"]="db_data" #业务线3 db_instance_maps["ywx3_db_insurance"]="db_ywx3" ... ... db_instance_maps["dss_db_dss"]="db_dss" } #时间处理 传入参数 yyyy-mm-dd function DateProcess() { echo "日期时间为"$1 if echo $1 | grep -Eq "[0-9]{4}-[0-9]{2}-[0-9]{2}" && date -d $1 +%Y%m%d > /dev/null 2>&1 then : START_TIME=$(date -d $1 "+%Y-%m-%d") END_TIME=$(date -d "$1 +1 day" +%Y-%m-%d) DT_TIME=$(date -d $1 +"%Y%m%d") INVALID_DATE=$(date -d "$1 -$INVALID_DAYS day" +%Y%m%d) echo 时间正确: $START_TIME / $END_TIME / $DT_TIME / $INVALID_DATE; else echo "输入的日期格式不正确,应为yyyy-mm-dd"; exit 1; fi; } function DataConnect() { db_business_key="$BUSINESS_NAME""_""$SOURCE_DB_NAME" db_instance_key=${db_instance_maps["$db_business_key"]} echo $db_business_key $db_instance_key if [ ! -n "$db_instance_key" ]; then echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加" exit 1; fi db_instance_value=${db_instance_conf["$db_instance_key"]} MYSQL_USERNAME=${db_instance_user_conf["$db_instance_key"]} MYSQL_PASSWD=${db_instance_pwd_conf["$db_instance_key"]} echo $db_instance_value if [ ! -n "$db_instance_value" ]; then echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加" exit 1; fi MYSQL_URL="$db_instance_value$SOURCE_DB_NAME" } #每天运行 执行dataX function BaseDataxMysql2Hive() { #清除重复同步数据分区&新增分区 hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION(day='$DT_TIME');ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME ADD IF NOT EXISTS PARTITION (day='$DT_TIME')"; #执行同步 echo "开始执行同步" if ! python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/pt_day=$DT_TIME" $DATAX_JSON_FILE;then echo "command failed" exit 1; fi echo "同步结束" #删除定义的失效日期数据 if(($IS_CLEAR_INVALID_DATA==1));then echo "清除失效$INVALID_DATE天数的历史数据" hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION (pt_day<=${INVALID_DATE});" fi #同步分区元数据 #hive -e "ANALYZE TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME PARTITION (day=${DT_TIME}) COMPUTE STATISTICS;" #删除分区数据 } function parseArgs() { while getopts ":d:ab:s:m:f:t:n:u:p:" opt do case $opt in d) echo "参数d的值$OPTARG" DateProcess $OPTARG ;; a) IS_CLEAR_INVALID_DATA=0 echo "参数a的值$OPTARG" ;; b) echo "参数b的值$OPTARG" BUSINESS_NAME=$OPTARG ;; m) echo "参数m的值$OPTARG" SOURCE_DB_NAME=$OPTARG ;; s) echo "参数s的值$OPTARG" SOURCE_TABLE_NAME=$OPTARG ;; f) echo "参数f的值$OPTARG" DATAX_JSON_FILE=$OPTARG ;; n) echo "参数n的值$OPTARG" TARGET_DB_NAME=$OPTARG ;; t) echo "参数t的值$OPTARG" TARGET_TABLE_NAME=$OPTARG ;; u) echo "参数u的值$OPTARG" MYSQL_USERNAME=$OPTARG ;; p) echo "参数t的值$OPTARG" MYSQL_PASSWD=$OPTARG ;; ?) echo "未知参数" exit 1;; :) echo "没有输入任何选项 $OPTARG" ;; esac done } function judgeParams() { if [ ! -n "$DT_TIME" ] ;then echo "you have not input a etlDate! format {-d yyyy-mm-dd} " exit 1; fi if [ ! -n "$BUSINESS_NAME" ] ;then echo "you have not input a businessName! incloud(xxx,xxxx,x,xx) example {-b xxx}" exit 1; fi if [ ! -n "$SOURCE_DB_NAME" ] ;then echo "you have not input a sourceDB!" exit 1; fi if [ ! -n "$SOURCE_TABLE_NAME" ] ;then echo "you have not input a sourceTable example {-s user_info}!" exit 1; fi if [ ! -n "$DATAX_JSON_FILE" ] ;then echo "you have not input a dataxJson! example {-f ods_ywx1_user_info_di.json}" exit 1; fi if [ ! -n "$TARGET_TABLE_NAME" ] ;then echo "you have not input a targetTable! example {-t ods_ywx1_user_info_di}" exit 1; fi } function startsync() { #初始化数据库实例 initInstanceConf #初始化库和数据库实例映射关系 initDbAndInstanceMaps #解析参数 parseArgs "$@" #初始化数据链接 DataConnect #判断参数 judgeParams #同步数据 BaseDataxMysql2Hive } # -d: 处理时间 # -b:业务线 (ywx,ywx1,ywx1,...,ywxn) # -m:源数据库 # -a:增量数据不清除分区数据:默认清除 # -s:源数据表 # -n:目标数据库 # -t:目标数据表 # -f:datax同步json文件 # -p:密码 # -u:用户名 startsync "$@"
有了这个shell脚本,后续对于同步的一些同步完成功能的通知以及新功能都可以新增。同时又新形成了一个数据同步的规范性和开发的规范性。
有了上面的脚本,我们就可以只需要写好源表和目的表的名称。同时通过 -a 来区别增量还是全量同步进行处理。
#源表 -s
SOURCE_TABLE_NAME="user_info"
#目标表 -t
TARGET_TABLE_NAME="ods_main_user_info_df"
#datax 文件 -f
DATAX_FILE="${BASE_DIR_PATH}/ods_main_user_info_df.json"
ETL_DATE=${ETL_DATE}
BUSINESS_NAME=${BUSINESS_NAME}
SOURCE_DB_NAME=${SOURCE_DB_NAME}
#!/bin/bash
source /etc/profile
sh dataxsync.sh -d $ETL_DATE -b $BUSINESS_NAME -m $SOURCE_DB_NAME -s $SOURCE_TABLE_NAME -t $TARGET_TABLE_NAME -f $DATAX_FILE
我们可以发现这个脚本中包含了四个变量:
${BASE_DIR_PATH}
${ETL_DATE}
${BUSINESS_NAME}
${SOURCE_DB_NAME}
这几个变量主要是通过调度平台传入,
BASE_DIR_PATH:dataX脚本的统一地址,之所以弄这个目录主要是为了区分不同业务线。
ETL_DATE:每天同步的时间 : yyyy-mm-dd,同时我们可以再脚本上多增加几个时间,通过这个变量转换出来(yyyyMMdd, yyyyMMdd-1…)
BUSINESS_NAME:业务线的标识,我们也是可以用主题域区分。主要是用来识别数据库
SOURCE_DB_NAME:业务库的表名。
这一套的使用,我们搭配小海豚调度一起使用。定时运行,定时调度。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。