当前位置:   article > 正文

使用DataX将Hive表数据导出至MongoDB(2022年)_error writerrunner - writer runner received except

error writerrunner - writer runner received exceptions

目录

背景:

需要解决的问题:

安装DataX

2)下载地址

3)上传jar包

4)解压

5)自动检测脚本

配置环境变量

使用

1. 读取hdfs数据写入mongodb

1)查看官方模板

2)编写配置文件

3)测试生成的dataX配置文件

4) 编写导出脚本

注意错误:

解决方法是:


背景:

        在网上查了很多,都是关于MongoDB导入至Hive仓库的,很少有从Hive导出至MongDB的文章。最近因为兄弟部门在做【用户画像】,使用的存储介质是MongoDB(便于字段扩展),现在要用到数仓里面某些表的数据,这样问题就出现了:“怎么将Hive表的数据导出至MongoDB?”。

        因为MySQL导入到HDFS用的是Sqoop,网上查了Sqoop对关系型的数据库比较友好,对MongoDB这样的NoSQL不是太支持,所以想到了用【阿里的DataX】。经过调研,dataX确实可以解决此场景。

需要解决的问题:

【使用DataX将Hive表数据导出至MongoDB】

安装DataX

1)前置条件

- Linux

- JDK(1.8以上,推荐1.8)

- Python(推荐Python2.6.X)

2)下载地址

http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

3)上传jar包

cd /opt/software

--上传jar包

4)解压

tar -zxvf datax.tar.gz -C /opt/module/

5)自动检测脚本

cd datax/bin/

python datax.py /opt/module/datax/job/job.json

说明安装

配置环境变量

#DATAX_HOME

export DATAX_HOME=/opt/module/datax

export PATH=$PATH:$DATAX_HOME/bin

使用

1. 读取hdfs数据写入mongodb

1)查看官方模板

python bin/datax.py -r hdfsreader -w mongodbwriter

  1. DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
  2. Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
  3. Please refer to the hdfsreader document:
  4. https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md
  5. Please refer to the mongodbwriter document:
  6. https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md
  7. Please save the following configuration as a json file and use
  8. python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
  9. to run the job.
  10. {
  11. "job": {
  12. "content": [
  13. {
  14. "reader": {
  15. "name": "hdfsreader",
  16. "parameter": {
  17. "column": [],
  18. "defaultFS": "",
  19. "encoding": "UTF-8",
  20. "fieldDelimiter": ",",
  21. "fileType": "orc",
  22. "path": ""
  23. }
  24. },
  25. "writer": {
  26. "name": "mongodbwriter",
  27. "parameter": {
  28. "address": [],
  29. "collectionName": "",
  30. "column": [],
  31. "dbName": "",
  32. "upsertInfo": {
  33. "isUpsert": "",
  34. "upsertKey": ""
  35. },
  36. "userName": "",
  37. "userPassword": ""
  38. }
  39. }
  40. }
  41. ],
  42. "setting": {
  43. "speed": {
  44. "channel": ""
  45. }
  46. }
  47. }
  48. }

2)编写配置文件

-配置文件生成脚本:

gen_export_config.py

  1. # coding=utf-8
  2. import json
  3. import getopt
  4. import os
  5. import sys
  6. import pymongo
  7. #MongoDB相关配置,需根据实际情况作出修改
  8. mongodb_host = "xxxx.mongodb.rds.aliyuncs.com"
  9. mongodb_port = "3717"
  10. mongodb_user = "xxx"
  11. mongodb_passwd = "xxx"
  12. #HDFS NameNode相关配置,需根据实际情况作出修改
  13. #生成配置文件的目标路径,可根据实际情况作出修改
  14. output_path = "/opt/module/datax/job/export"
  15. def generate_json(target_database, target_table):
  16. job = {
  17. "job": {
  18. "setting": {
  19. "speed": {
  20. "channel": 3
  21. },
  22. "errorLimit": {
  23. "record": 0,
  24. "percentage": 0.02
  25. }
  26. },
  27. "content": [{
  28. "reader": {
  29. "name": "hdfsreader",
  30. "parameter": {
  31. "path": "${exportdir}",
  32. "defaultFS": "hdfs://xx",
  33. "hadoopConfig":{
  34. "dfs.nameservices": "xx",
  35. "dfs.ha.namenodes.mycluster": "nn1,nn2,nn3",
  36. "dfs.namenode.rpc-address.xxx": "xxx",
  37. "dfs.namenode.rpc-address.xxx": "xxx",
  38. "dfs.namenode.rpc-address.xxx": "xxx",
  39. "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  40. },
  41. "column": ["*"],
  42. "fileType": "text",
  43. "encoding": "UTF-8",
  44. "fieldDelimiter": "\t",
  45. "nullFormat": "\\N"
  46. }
  47. },
  48. "writer": {
  49. "name": "mongodbwriter",
  50. "parameter": {
  51. "address": ["xxx"],
  52. "collectionName": "xxx",
  53. "column": [
  54. {
  55. "name":"id",
  56. "type":"string"
  57. },
  58. {
  59. "name":"channel",
  60. "type":"string"
  61. },
  62. {
  63. "name":"platform",
  64. "type":"string"
  65. }
  66. ],
  67. "dbName": "xxx",
  68. "writeMode": "replace",
  69. "userName": "xxx",
  70. "userPassword": "xxx"
  71. }
  72. }
  73. }]
  74. }
  75. }
  76. if not os.path.exists(output_path):
  77. os.makedirs(output_path)
  78. with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
  79. json.dump(job, f)
  80. def main(args):
  81. target_database = ""
  82. target_table = ""
  83. options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
  84. for opt_name, opt_value in options:
  85. if opt_name in ('-d', '--targetdb'):
  86. target_database = opt_value
  87. if opt_name in ('-t', '--targettbl'):
  88. target_table = opt_value
  89. generate_json(target_database, target_table)
  90. if __name__ == '__main__':
  91. main(sys.argv[1:])

--安装Python Mongodb驱动

--三台机器都需要安装

sudo yum install -y pymongo

--生成datax同步文件

python gen_export_config.py -d xxx -t xxx

-编写生成脚本

gen_export_config.sh

vim ~/bin/gen_export_config.sh

  1. #!/bin/bash
  2. python ~/bin/gen_export_config.py -d xxx -t xxx

 添加执行权限

chmod +x ~/bin/gen_export_config.sh

--生成配置文件

gen_export_config.sh 

3)测试生成的dataX配置文件

执行命令

python /opt/module/datax/bin/datax.py -p"-Dexportdir=/warehouse/xxx/dwd/xxx/dt=2022-04-19" /opt/module/datax/job/export/xxx.xxx.json

观察mongodb结果

同步了mongodb数据

4) 编写导出脚本

vim hdfs_to_mongodb.sh 

  1. #! /bin/bash
  2. DATAX_HOME=/opt/module/datax
  3. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
  4. if [ -n "$2" ] ;then
  5. do_date=$2
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. #DataX导出路径不允许存在空文件,该函数作用为清理空文件
  10. handle_export_path(){
  11. for i in `hadoop fs -ls -R $1 | awk '{print $8}'`; do
  12. hadoop fs -test -z $i
  13. if [[ $? -eq 0 ]]; then
  14. echo "$i文件大小为0,正在删除"
  15. hadoop fs -rm -r -f $i
  16. fi
  17. done
  18. }
  19. #数据导出
  20. export_data() {
  21. datax_config=$1
  22. export_dir=$2
  23. handle_export_path $export_dir
  24. $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
  25. }
  26. case $1 in
  27. "dwd_event_log")
  28. export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
  29. ;;
  30. "all")
  31. export_data /opt/module/datax/job/export/xxx.json /warehouse/xxx/dwd/xxx/dt=${do_date}
  32. ;;
  33. esac

--添加权限

chmod +x hdfs_to_mongodb.sh 

--执行

hdfs_to_mongodb.sh all

成功导入300137条数据至mongodb!!! 

注意错误:

ERROR WriterRunner - Writer Runner Received Exceptions:
com.alibaba.fastjson.JSONException: syntax error, pos 1

  1. 2022-04-20 16:53:00.138 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"\t","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
  2. 2022-04-20 16:53:00.205 [0-0-2-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
  3. com.alibaba.fastjson.JSONException: syntax error, pos 1
  4. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
  5. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
  6. at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
  7. at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
  8. at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
  9. at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
  10. at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
  11. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
  12. 2022-04-20 16:53:00.206 [0-0-1-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
  13. com.alibaba.fastjson.JSONException: syntax error, pos 1
  14. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
  15. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
  16. at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
  17. at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
  18. at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
  19. at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
  20. at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
  21. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
  22. 2022-04-20 16:53:00.208 [0-0-0-writer] ERROR WriterRunner - Writer Runner Received Exceptions:
  23. com.alibaba.fastjson.JSONException: syntax error, pos 1
  24. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1295) ~[fastjson-1.1.46.sec01.jar:na]
  25. at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1205) ~[fastjson-1.1.46.sec01.jar:na]
  26. at com.alibaba.fastjson.JSON.parse(JSON.java:108) ~[fastjson-1.1.46.sec01.jar:na]
  27. at com.alibaba.fastjson.JSON.parse(JSON.java:99) ~[fastjson-1.1.46.sec01.jar:na]
  28. at com.alibaba.fastjson.JSON.parseObject(JSON.java:170) ~[fastjson-1.1.46.sec01.jar:na]
  29. at com.alibaba.datax.plugin.writer.mongodbwriter.MongoDBWriter$Task.init(MongoDBWriter.java:331) ~[mongodbwriter-0.0.1-SNAPSHOT.jar:na]
  30. at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
  31. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

解决方法是:

将配置文件中的:

"writeMode": "replace",

修改成了:

  1. "isReplace": "true",
  2. "replaceKey": "id",

问题就解决了!!!

好了,如果你感觉有用,请帮忙点赞吧!!!!

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号