赞
踩
目录
在网上查了很多,都是关于MongoDB导入至Hive仓库的,很少有从Hive导出至MongDB的文章。最近因为兄弟部门在做【用户画像】,使用的存储介质是MongoDB(便于字段扩展),现在要用到数仓里面某些表的数据,这样问题就出现了:“怎么将Hive表的数据导出至MongoDB?”。
因为MySQL导入到HDFS用的是Sqoop,网上查了Sqoop对关系型的数据库比较友好,对MongoDB这样的NoSQL不是太支持,所以想到了用【阿里的DataX】。经过调研,dataX确实可以解决此场景。
【使用DataX将Hive表数据导出至MongoDB】
1)前置条件
- Linux
- JDK(1.8以上,推荐1.8)
- Python(推荐Python2.6.X)
http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
cd /opt/software
--上传jar包
tar -zxvf datax.tar.gz -C /opt/module/
cd datax/bin/
python datax.py /opt/module/datax/job/job.json
说明安装
#DATAX_HOME
export DATAX_HOME=/opt/module/datax
export PATH=$PATH:$DATAX_HOME/bin
python bin/datax.py -r hdfsreader -w mongodbwriter
- DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
- Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
-
-
- Please refer to the hdfsreader document:
- https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
-
- Please refer to the mongodbwriter document:
- https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md
-
- Please save the following configuration as a json file and use
- python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
- to run the job.
-
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "hdfsreader",
- "parameter": {
- "column": [],
- "defaultFS": "",
- "encoding": "UTF-8",
- "fieldDelimiter": ",",
- "fileType": "orc",
- "path": ""
- }
- },
- "writer": {
- "name": "mongodbwriter",
- "parameter": {
- "address": [],
- "collectionName": "",
- "column": [],
- "dbName": "",
- "upsertInfo": {
- "isUpsert": "",
- "upsertKey": ""
- },
- "userName": "",
- "userPassword": ""
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": ""
- }
- }
- }
- }
-配置文件生成脚本:
gen_export_config.py
- # coding=utf-8
- import json
- import getopt
- import os
- import sys
- import pymongo
-
- #MongoDB相关配置,需根据实际情况作出修改
- mongodb_host = "xxxx.mongodb.rds.aliyuncs.com"
- mongodb_port = "3717"
- mongodb_user = "xxx"
- mongodb_passwd = "xxx"
-
- #HDFS NameNode相关配置,需根据实际情况作出修改
-
- #生成配置文件的目标路径,可根据实际情况作出修改
- output_path = "/opt/module/datax/job/export"
-
- def generate_json(target_database, target_table):
- job = {
- "job": {
- "setting": {
- "speed": {
- "channel": 3
- },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [{
- "reader": {
- "name": "hdfsreader",
- "parameter": {
- "path": "${exportdir}",
- "defaultFS": "hdfs://xx",
- "hadoopConfig":{
- "dfs.nameservices": "xx",
- "dfs.ha.namenodes.mycluster": "nn1,nn2,nn3",
- "dfs.namenode.rpc-address.xxx": "xxx",
- "dfs.namenode.rpc-address.xxx": "xxx",
- "dfs.namenode.rpc-address.xxx": "xxx",
- "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
- },
- "column": ["*"],
- "fileType": "text",
- "encoding": "UTF-8",
- "fieldDelimiter": "\t",
- "nullFormat": "\\N"
- }
- },
- "writer": {
- "name": "mongodbwriter",
- "parameter": {
- "address": ["xxx"],
- "collectionName": "xxx",
- "column": [
- {
- "name":"id",
- "type":"string"
- },
- {
- "name":"channel",
- "type":"string"
- },
- {
- "name":"platform",
- "type":"string"
- }
- ],
- "dbName": "xxx",
- "writeMode": "replace",
- "userName": "xxx",
- "userPassword": "xxx"
- }
- }
- }]
- }
- }
- if not os.path.exists(output_path):
- os.makedirs(output_path)
- with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
- json.dump(job, f)
-
-
- def main(args):
- target_database = ""
- target_table = ""
-
- options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
- for opt_name, opt_value in options:
- if opt_name in ('-d', '--targetdb'):
- target_database = opt_value
- if opt_name in ('-t', '--targettbl'):
- target_table = opt_value
-
- generate_json(target_database, target_table)
-
- if __name__ == '__main__':
- main(sys.argv[1:])
--安装Python Mongodb驱动
--三台机器都需要安装
sudo yum install -y pymongo
--生成datax同步文件
python gen_export_config.py -d xxx -t xxx
-编写生成脚本
gen_export_config.sh
vim ~/bin/gen_export_config.sh
- #!/bin/bash
-
- python ~/bin/gen_export_config.py -d xxx -t xxx
添加执行权限
chmod +x ~/bin/gen_export_config.sh
--生成配置文件
gen_export_config.sh
执行命令
python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/xxx/dwd/xxx/dt=2022-04-19" /opt/module/datax/job/export/xxx.xxx.json
观察mongodb结果
同步了mongodb数据
vim hdfs_to_mongodb.sh
- #! /bin/bash
-
- DATAX_HOME=/opt/module/datax
-
- # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
- if [ -n "$2" ] ;then
- do_date=$2
- else
- do_date=`date -d "-1 day" +%F`
- fi
-
- #DataX导出路径不允许存在空文件,该函数作用为清理空文件
- handle_export_path(){
- for i in `hadoop fs -ls -R $1 | awk '{print $8}'`; do
- hadoop fs -test -z $i
- if [[ $? -eq 0 ]]; then
- echo "$i文件大小为0,正在删除"
- hadoop fs -rm -r -f $i
- fi
- done
- }
-
- #数据导出
- export_data() {
- datax_config=$1
- export_dir=$2
- handle_export_path $export_dir
- $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
- }
-
- case $1 in
- "dwd_event_log")
- export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
- ;;
-
-
- "all")
- export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
- ;;
- esac
--添加权限
chmod +x hdfs_to_mongodb.sh
--执行
hdfs_to_mongodb.sh all
成功导入300137条数据至mongodb!!!
ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1
- 2022-04-20 16:53:00.138 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"\t","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
- 2022-04-20 16:53:00.205 [0-0-2-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
- com.alibaba.fastjson.JSONException: syntax error, pos 1
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
- at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
- at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
- 2022-04-20 16:53:00.206 [0-0-1-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
- com.alibaba.fastjson.JSONException: syntax error, pos 1
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
- at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
- at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
- 2022-04-20 16:53:00.208 [0-0-0-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
- com.alibaba.fastjson.JSONException: syntax error, pos 1
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
- at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
- at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
- at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
将配置文件中的:
"writeMode": "replace",
修改成了:
- "isReplace": "true",
- "replaceKey": "id",
问题就解决了!!!
好了,如果你感觉有用,请帮忙点赞吧!!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。