当前位置:   article > 正文

dataX同步mysql至hive_datax hive

datax hive

1、前言

我们的业务数据基本都是在数据库中,如果需要离线同步到hdfs我们就需要使用dataX工具。使用dataX只需要学好json脚本,配置好数据源和路径就可以了。以下是我的一个mysql同步到HIve,以上的变量都可以通过传参统一一个脚本处理。

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "${username}",
                        "password": "${password}",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "${jdbcUrl}"
                                ],
                                "querySql": [
                                    "select id,create_time,update_time from ${sourceTableName} where update_time<'${endTime}' "                                  
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            },
                            {
                                "name": "update_time",
                                "type": "string"
                            }
                        ],
                        "isCompress": "${isCompress}",
                        "defaultFS": "${hdfsPath}",
                        "fieldDelimiter": "${fieldDelimiter}",
                        "fileName": "${fileName}",
                        "fileType": "${fileType}",
                        "path": "${path}",
                        "writeMode": "${writeMode}"
                    }
                }
            }
        ]
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

2、参数调用和传参

很多人在问如何使用脚本变量如何传参。
首先我们要会使用dataX如何调用这个脚本。

python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME  -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME  -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/day=$DT_TIME" $DATAX_JSON_FILE;
  • 1

在这个命令中你会发现将所有的变量都通过shell命令传递进去了。后续的这些变量传递我在更新。之所以这么多变量其主要是为了方便后续的脚本更新和调度运行。
对于开发人员只需要关心主要逻辑就行了。
有了这个基础脚本,我们就可以将HIVE上的一些功能一起合并到shell脚本中:

  • 增量同步,保留全部数据。
  • 全量同步,全量同步只保留固定周期的历史全量。
  • 刷新元数据。
  • 通知更新成功。
  • 多个mysql业务库的匹配。
  • 生产业务库密码的保护。

3、封装shell调用脚本

基于上面的考虑。封装dataX的调用脚本。

#!/bin/bash
source /etc/profile
DATAX_HOME="/home/data/datax"
SHELL_PATH="/home/data/dw_datax"
SCRIPT_PATH=${SHELL_PATH}/job
DATAX_LOG=${SHELL_PATH}/logs/datax.log.`date "+%Y-%m-%d"`
HDFS_PATH="hdfs://hdfs-cluster"
#START_TIME=$(date -d "-1 day" +%Y-%m-%d)
#END_TIME=$(date "+%Y-%m-%d")
#DT_TIME=$(date -d "-1 day" +%Y%m%d)
START_TIME=""
END_TIME=""
DT_TIME=""
#失效日期
INVALID_DATE=""
#失效天数
INVALID_DAYS=180
#是否清除失效数据:默认清除
IS_CLEAR_INVALID_DATA=1

#参数
ISCOMPRESS="false"
WRITEMODE="nonConflict"
FIELDDELIMITER="|"
FILETYPE="orc"
PATH_HIVE="/user/hive/warehouse/"
MYSQL_URL=""
#数据库用户名
MYSQL_USERNAME="admin"
#数据库密码
MYSQL_PASSWD="123456"
#默认同步目标库名
TARGET_DB_NAME="ods"
#同步源库名
SOURCE_DB_NAME=""
#同步源表名
SOURCE_TABLE_NAME=""
#业务名称
BUSINESS_NAME=""
#datax json文件
DATAX_JSON_FILE=/temp


# 数据库实例信息
declare -A db_instance_conf
# 数据库用户名
declare -A db_instance_user_conf
# 数据库密码
declare -A db_instance_pwd_conf
# 数据库实例与库映射关系
declare -A db_instance_maps

# 初始化数据库实例配置
function initInstanceConf()
{
        # 主业务线 ywx1
        db_instance_conf["db_main_data"]="jdbc:mysql://192.168.1.1:3306/"
        db_instance_user_conf["db_main_data"]="admin"
        db_instance_pwd_conf["db_main_data"]="123456"
         # 业务线2 ywx2
        db_instance_conf["db_data"]="jdbc:mysql://192.168.1.2:3306/"
        db_instance_user_conf["db_data"]="admin"
        db_instance_pwd_conf["db_data"]="123456"
        ...
        
}

# 初始化库和数据库实例映射关系
function initDbAndInstanceMaps()
{
        #主业务线
        db_instance_maps["ywx1_db_main"]="db_main_data"
        
        #业务线2
        db_instance_maps["ywx2_db_data"]="db_data"
        #业务线3
        db_instance_maps["ywx3_db_insurance"]="db_ywx3"
        
        ...
        
        ...
        
        db_instance_maps["dss_db_dss"]="db_dss"
		
}

#时间处理  传入参数 yyyy-mm-dd
function DateProcess()
{
echo "日期时间为"$1
if echo $1 | grep -Eq "[0-9]{4}-[0-9]{2}-[0-9]{2}" && date -d $1 +%Y%m%d > /dev/null 2>&1
  then :
        START_TIME=$(date -d $1 "+%Y-%m-%d")
        END_TIME=$(date -d "$1 +1 day" +%Y-%m-%d)
        DT_TIME=$(date -d $1 +"%Y%m%d")
        INVALID_DATE=$(date -d "$1 -$INVALID_DAYS day" +%Y%m%d)
        echo 时间正确: $START_TIME / $END_TIME / $DT_TIME / $INVALID_DATE;
else
  echo "输入的日期格式不正确,应为yyyy-mm-dd";
  exit 1;
fi;

}

function DataConnect()
{
        db_business_key="$BUSINESS_NAME""_""$SOURCE_DB_NAME"
        db_instance_key=${db_instance_maps["$db_business_key"]}
        echo $db_business_key $db_instance_key
        if [ ! -n "$db_instance_key" ]; then
                echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加"
                exit 1;
        fi
        db_instance_value=${db_instance_conf["$db_instance_key"]}
	MYSQL_USERNAME=${db_instance_user_conf["$db_instance_key"]}
        MYSQL_PASSWD=${db_instance_pwd_conf["$db_instance_key"]}
	echo $db_instance_value
        if [ ! -n "$db_instance_value" ]; then
                echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加"
                exit 1;
        fi
        MYSQL_URL="$db_instance_value$SOURCE_DB_NAME"
}

#每天运行 执行dataX
function BaseDataxMysql2Hive()
{
        #清除重复同步数据分区&新增分区
        hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION(day='$DT_TIME');ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME ADD IF NOT EXISTS PARTITION (day='$DT_TIME')";
        #执行同步
        echo "开始执行同步"
    if ! python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME  -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME  -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/pt_day=$DT_TIME" $DATAX_JSON_FILE;then
        echo "command failed"
        exit 1;
        fi

    echo "同步结束"
    #删除定义的失效日期数据
    if(($IS_CLEAR_INVALID_DATA==1));then
        echo "清除失效$INVALID_DATE天数的历史数据"
        hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION (pt_day<=${INVALID_DATE});"
        fi
    #同步分区元数据
        #hive -e "ANALYZE TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME PARTITION (day=${DT_TIME}) COMPUTE STATISTICS;"
        #删除分区数据
}

function parseArgs()
{
        while getopts ":d:ab:s:m:f:t:n:u:p:" opt
        do
            case $opt in
                d)
            echo "参数d的值$OPTARG"
                DateProcess $OPTARG
                ;;
                a)
                IS_CLEAR_INVALID_DATA=0
                echo "参数a的值$OPTARG"
                ;;
                b)
                echo "参数b的值$OPTARG"
                BUSINESS_NAME=$OPTARG
                ;;
                m)
                echo "参数m的值$OPTARG"
                SOURCE_DB_NAME=$OPTARG
                ;;
                s)
                echo "参数s的值$OPTARG"
                SOURCE_TABLE_NAME=$OPTARG
                ;;
                f)
                echo "参数f的值$OPTARG"
                DATAX_JSON_FILE=$OPTARG
                ;;
                n)
                echo "参数n的值$OPTARG"
                TARGET_DB_NAME=$OPTARG
                ;;
                t)
                echo "参数t的值$OPTARG"
                TARGET_TABLE_NAME=$OPTARG
                ;;
                u)
                echo "参数u的值$OPTARG"
                MYSQL_USERNAME=$OPTARG
                ;;
                p)
                echo "参数t的值$OPTARG"
                MYSQL_PASSWD=$OPTARG
                ;;
                ?)
                echo "未知参数"
                exit 1;;
                :)
            echo "没有输入任何选项 $OPTARG"
            ;;
        esac done
}

function judgeParams()
{
        if  [ ! -n "$DT_TIME" ] ;then
            echo "you have not input a etlDate! format {-d yyyy-mm-dd} "
            exit 1;
        fi

        if  [ ! -n "$BUSINESS_NAME" ] ;then
            echo "you have not input a businessName! incloud(xxx,xxxx,x,xx) example {-b xxx}"
            exit 1;
        fi

        if  [ ! -n "$SOURCE_DB_NAME" ] ;then
            echo "you have not input a sourceDB!"
            exit 1;
        fi

        if  [ ! -n "$SOURCE_TABLE_NAME" ] ;then
            echo "you have not input a sourceTable example {-s user_info}!"
            exit 1;
        fi

        if  [ ! -n "$DATAX_JSON_FILE" ] ;then
            echo "you have not input a dataxJson! example {-f ods_ywx1_user_info_di.json}"
            exit 1;
        fi
        if  [ ! -n "$TARGET_TABLE_NAME" ] ;then
            echo "you have not input a targetTable! example {-t ods_ywx1_user_info_di}"
            exit 1;
        fi
}


function startsync()
{

        #初始化数据库实例
        initInstanceConf
        #初始化库和数据库实例映射关系
        initDbAndInstanceMaps
        #解析参数
        parseArgs "$@"
        #初始化数据链接
        DataConnect
        #判断参数
        judgeParams
        #同步数据
        BaseDataxMysql2Hive
}

# -d: 处理时间
# -b:业务线 (ywx,ywx1,ywx1,...,ywxn)
# -m:源数据库
# -a:增量数据不清除分区数据:默认清除
# -s:源数据表
# -n:目标数据库
# -t:目标数据表
# -f:datax同步json文件
# -p:密码
# -u:用户名

startsync "$@"

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264

有了这个shell脚本,后续对于同步的一些同步完成功能的通知以及新功能都可以新增。同时又新形成了一个数据同步的规范性和开发的规范性。

4、调度平台调度脚本

有了上面的脚本,我们就可以只需要写好源表和目的表的名称。同时通过 -a 来区别增量还是全量同步进行处理。

#源表 -s
SOURCE_TABLE_NAME="user_info"
#目标表 -t
TARGET_TABLE_NAME="ods_main_user_info_df"
#datax 文件 -f
DATAX_FILE="${BASE_DIR_PATH}/ods_main_user_info_df.json"
ETL_DATE=${ETL_DATE}
BUSINESS_NAME=${BUSINESS_NAME}
SOURCE_DB_NAME=${SOURCE_DB_NAME}
#!/bin/bash
source /etc/profile
sh dataxsync.sh -d $ETL_DATE -b $BUSINESS_NAME -m $SOURCE_DB_NAME -s $SOURCE_TABLE_NAME -t $TARGET_TABLE_NAME -f $DATAX_FILE
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

我们可以发现这个脚本中包含了四个变量:
${BASE_DIR_PATH}
${ETL_DATE}
${BUSINESS_NAME}
${SOURCE_DB_NAME}

这几个变量主要是通过调度平台传入,
BASE_DIR_PATH:dataX脚本的统一地址,之所以弄这个目录主要是为了区分不同业务线。
ETL_DATE:每天同步的时间 : yyyy-mm-dd,同时我们可以再脚本上多增加几个时间,通过这个变量转换出来(yyyyMMdd, yyyyMMdd-1…)
BUSINESS_NAME:业务线的标识,我们也是可以用主题域区分。主要是用来识别数据库
SOURCE_DB_NAME:业务库的表名。
这一套的使用,我们搭配小海豚调度一起使用。定时运行,定时调度。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/141118
推荐阅读
相关标签
  

闽ICP备14008679号