当前位置:   article > 正文

datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)_datax mongo

datax mongo

一、同步环境

1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控)
2.datax版本:自己编译的DataX-datax_v202210
3.hdfs版本:3.1.3
4.hive版本:3.1.2

二、同步思路

1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个小时的数据依次循环调用datax同步至hdfs,利用shell脚本和调度器定时装载至hive中形成ods层,并和其他表关联处理形成dwd层,提供给需求方。
2.全量数据:历史数据才用datax编写脚本循环读取+调度+hive动态分区方式同步至hive。因为hive动态分区默认只支持100个分区,我是按小时进行分区的,因此我每次只拉取4天数据,拉取太多报错,编写脚本,需要多少天,拉取多少天。(比较笨的方法,有更好的方式欢迎评论区讨论)

三、datax配置

{
    "job": {
        "content": [
          {
              "reader": {
                  "name": "mongodbreader",
                  "parameter": {
                      "address": ["xxxxxxxx:27017"],
                      "authDb": "admin",
                      "userName": "xxxxx",
                      "userPassword": "xxxx",
                      "dbName": "xxxx",
                      "collectionName": "xxxx",
                      "column": [
                          {
                              "name": "_id",
                              "type": "string"
                          },
                          {
                              "name": "data",
                              "type": "string"
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "bigint"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          }
                      ],

                      "query":"{\"time\":{ \"$gte\": ${start_time}, \"$lt\": ${end_time}}}"

                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                             {
                              "name": "ask_id",
                              "type": "string"
                          },
                          {
                              "name": "data",
                              "type": "string"
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "string"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          }
                        ],
                        "compress": "gzip",
                        "defaultFS": "xxxx:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "xxxx",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

这里面有两个坑。
第一个:datax连接mongodb一定注意"authDb": “admin”,这个配置,要明确同步账号认证库的位置,账号在那个库里面认证的就写哪个库,由于mongodb每个库是单独认证的,一直报:

com.alibaba.datax.common.exception.DataXException: Code:[Framework-02], Description:[DataX引擎运行过程出错,具体原因请参看DataX运行结束时的错误诊断信息  .].  - com.mongodb.MongoCommandException: Command failed with error 13: 'command count requires authentication' on server xxx:27117. The full response is { "ok" : 0.0, "errmsg" : "command count requires authentication", "code" : 13, "codeName" : "Unauthorized" }
  • 1

找过很多资料,两种方式解决账号认证问题。一种是,刚才提到的指明账号认证库;第二种,就是同步哪个库,单独给这个账号再授权一遍库的权限,代码如下:

db.createUser({user:"x x 
x x x",pwd:"xxxxxx",roles:[{"role":"read","db":"xxxx"}]})
  • 1
  • 2

查询同步不需要太高的权限,read即可
第二坑:mongodb的query查询,用的是json语句,网上有大神分享的源码分析,里面的查询条件是“and”语句,也就是说,用逗号分隔的查询条件是and,想用or要多次查询(但是我测试十几也不全是and,好像是同样的字段以最后一条为准,留着后面再研究班),哎,没办法,谁让我懒得自己写代码,凑合着用吧。分享query查询语句多个条件的用法:

                      "query":"{\"time\":{ \"$gte\": 1646064000, \"$lte\": 1648742399},\"time\":{ \"$gte\": 1654012800, \"$lte\": 1656604799},\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"

  • 1
  • 2

四、datax同步调度脚本

#!/bin/bash

# 定义变量方便修改
APP=xxx
TABLE=xxx
DATAX_HOME=xxxx
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一小时

    do_date=2022111416
   hr1=${do_date: 8: 2}
   date1=${do_date: 0: 8}
  

hdfs_path=xxx

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
  hadoop fs -test -e $hdfs_path
  if [[ $? -eq 1 ]]; then
    echo "路径 $hdfs_path 不存在,正在创建......"
    hadoop fs -mkdir -p $hdfs_path
  else
    echo "路径 $hdfs_path 已经存在"
    fs_count=$(hadoop fs -count $hdfs_path)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$hdfs_path为空"
    else
      echo "路径$hdfs_path不为空,正在清空......"
      hadoop fs -rm -r -f $hdfs_path/*
    fi
  fi


#数据同步

for i in  xxx xxx xxx
do  
echo ================== $i 装载日期为 $do_date ==================
python  $DATAX_HOME/bin/datax.py -p"-Dcollection=$i -Dtargetdir=$hdfs_path"   $DATAX_HOME/xxx
done 

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

五、datax同步至es 配置

mongodb同步至es有一个专用的组件,monstache;知道,但还没用过,留白,由于时间紧张用的datax,此处三个注意点:
1.object格式可以datax读取的时候可用string,导入es再改回object
2.es重名没问题
3.想用es中文分词统计词频,除了要配置中文ik,也需要filedata=true;

{
    "job": {
        "content": [
          {
              "reader": {
                  "name": "mongodbreader",
                  "parameter": {
                      "address": ["xxxx:27017"],
                      "userName": "xxx",
                      "authDb": "xxx",
                      "userPassword": "xxxx",
                      "dbName": "xxxx",
                      "collectionName": "${collection}",
                      "column": [
                          {
                              "name": "_id",
                              "type": "string" #原有格式为objectid,用此处用string
                          },
                          {
                              "name": "data",
                              "type": "string" #原有格式为list(object),用string可以倒进去
                          },
                          {
                              "name": "gid",
                              "type": "string"
                          },
                          {
                              "name": "text",
                              "type": "string"
                          },
                          {
                              "name": "time",
                              "type": "bigint"
                          },
                          {
                              "name": "uid",
                              "type": "string"
                          },
                          {
                              "name": "deleted",
                              "type": "bigint"
                          }
                      ],

                      "query":"{\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
                  }
                },
                "writer": {
                "name": "elasticsearchwriter",
                "parameter": {
                  "endpoint": "xxxxxx:9200",
                  "index": "xxxx",
                  "type": "xxxx",
                  "cleanup": false,
                  "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
                  "discovery": false,
                  "batchSize": 2048,
                  "splitter": ",",
                  "column": [
                   {
                              "name": "_id",
                              "type": "id"
                          },
                          {
                              "name": "data",
                              "type": "object" #源数据为object,此处也为object
                          
                          },
                          {
                              "name": "gid",
                              "type": "keyword"
                          },
                          {
                              "name": "text",#即使和关键词重名也不影响,挺好
                              "type": "text","analyzer": "ik_smart"
                          },#此处想用es分词,来统计词频的小伙伴建议开启filedata:true,不知道能不能用哈,反正我知道不开启,不能用,有兴趣可以研究下,告诉我
                          {
                              "name": "time",
                              "type": "long"
                          },
                          {
                              "name": "uid",
                              "type": "keyword"
                          },
                          {
                              "name": "deleted",
                              "type": "long"
                          }
                  ]
                }
              }
            }
        ],
        "setting": {
            "speed": {
                "channel": 4
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

六、其他问题

其他就比较简单了,懒得记了,后面有问题再补充

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/141131
推荐阅读
相关标签
  

闽ICP备14008679号