赞
踩
1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控)
2.datax版本:自己编译的DataX-datax_v202210
3.hdfs版本:3.1.3
4.hive版本:3.1.2
1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个小时的数据依次循环调用datax同步至hdfs,利用shell脚本和调度器定时装载至hive中形成ods层,并和其他表关联处理形成dwd层,提供给需求方。
2.全量数据:历史数据才用datax编写脚本循环读取+调度+hive动态分区方式同步至hive。因为hive动态分区默认只支持100个分区,我是按小时进行分区的,因此我每次只拉取4天数据,拉取太多报错,编写脚本,需要多少天,拉取多少天。(比较笨的方法,有更好的方式欢迎评论区讨论)
{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["xxxxxxxx:27017"], "authDb": "admin", "userName": "xxxxx", "userPassword": "xxxx", "dbName": "xxxx", "collectionName": "xxxx", "column": [ { "name": "_id", "type": "string" }, { "name": "data", "type": "string" }, { "name": "gid", "type": "string" }, { "name": "text", "type": "string" }, { "name": "time", "type": "bigint" }, { "name": "uid", "type": "string" } ], "query":"{\"time\":{ \"$gte\": ${start_time}, \"$lt\": ${end_time}}}" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "ask_id", "type": "string" }, { "name": "data", "type": "string" }, { "name": "gid", "type": "string" }, { "name": "text", "type": "string" }, { "name": "time", "type": "string" }, { "name": "uid", "type": "string" } ], "compress": "gzip", "defaultFS": "xxxx:8020", "fieldDelimiter": "\t", "fileName": "xxxx", "fileType": "text", "path": "${targetdir}", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
这里面有两个坑。
第一个:datax连接mongodb一定注意"authDb": “admin”,这个配置,要明确同步账号认证库的位置,账号在那个库里面认证的就写哪个库,由于mongodb每个库是单独认证的,一直报:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-02], Description:[DataX引擎运行过程出错,具体原因请参看DataX运行结束时的错误诊断信息 .]. - com.mongodb.MongoCommandException: Command failed with error 13: 'command count requires authentication' on server xxx:27117. The full response is { "ok" : 0.0, "errmsg" : "command count requires authentication", "code" : 13, "codeName" : "Unauthorized" }
找过很多资料,两种方式解决账号认证问题。一种是,刚才提到的指明账号认证库;第二种,就是同步哪个库,单独给这个账号再授权一遍库的权限,代码如下:
db.createUser({user:"x x
x x x",pwd:"xxxxxx",roles:[{"role":"read","db":"xxxx"}]})
查询同步不需要太高的权限,read即可
第二坑:mongodb的query查询,用的是json语句,网上有大神分享的源码分析,里面的查询条件是“and”语句,也就是说,用逗号分隔的查询条件是and,想用or要多次查询(但是我测试十几也不全是and,好像是同样的字段以最后一条为准,留着后面再研究班),哎,没办法,谁让我懒得自己写代码,凑合着用吧。分享query查询语句多个条件的用法:
"query":"{\"time\":{ \"$gte\": 1646064000, \"$lte\": 1648742399},\"time\":{ \"$gte\": 1654012800, \"$lte\": 1656604799},\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
#!/bin/bash # 定义变量方便修改 APP=xxx TABLE=xxx DATAX_HOME=xxxx # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一小时 do_date=2022111416 hr1=${do_date: 8: 2} date1=${do_date: 0: 8} hdfs_path=xxx #处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行 hadoop fs -test -e $hdfs_path if [[ $? -eq 1 ]]; then echo "路径 $hdfs_path 不存在,正在创建......" hadoop fs -mkdir -p $hdfs_path else echo "路径 $hdfs_path 已经存在" fs_count=$(hadoop fs -count $hdfs_path) content_size=$(echo $fs_count | awk '{print $3}') if [[ $content_size -eq 0 ]]; then echo "路径$hdfs_path为空" else echo "路径$hdfs_path不为空,正在清空......" hadoop fs -rm -r -f $hdfs_path/* fi fi #数据同步 for i in xxx xxx xxx do echo ================== $i 装载日期为 $do_date ================== python $DATAX_HOME/bin/datax.py -p"-Dcollection=$i -Dtargetdir=$hdfs_path" $DATAX_HOME/xxx done
mongodb同步至es有一个专用的组件,monstache;知道,但还没用过,留白,由于时间紧张用的datax,此处三个注意点:
1.object格式可以datax读取的时候可用string,导入es再改回object
2.es重名没问题
3.想用es中文分词统计词频,除了要配置中文ik,也需要filedata=true;
{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["xxxx:27017"], "userName": "xxx", "authDb": "xxx", "userPassword": "xxxx", "dbName": "xxxx", "collectionName": "${collection}", "column": [ { "name": "_id", "type": "string" #原有格式为objectid,用此处用string }, { "name": "data", "type": "string" #原有格式为list(object),用string可以倒进去 }, { "name": "gid", "type": "string" }, { "name": "text", "type": "string" }, { "name": "time", "type": "bigint" }, { "name": "uid", "type": "string" }, { "name": "deleted", "type": "bigint" } ], "query":"{\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}" } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "xxxxxx:9200", "index": "xxxx", "type": "xxxx", "cleanup": false, "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}}, "discovery": false, "batchSize": 2048, "splitter": ",", "column": [ { "name": "_id", "type": "id" }, { "name": "data", "type": "object" #源数据为object,此处也为object }, { "name": "gid", "type": "keyword" }, { "name": "text",#即使和关键词重名也不影响,挺好 "type": "text","analyzer": "ik_smart" },#此处想用es分词,来统计词频的小伙伴建议开启filedata:true,不知道能不能用哈,反正我知道不开启,不能用,有兴趣可以研究下,告诉我 { "name": "time", "type": "long" }, { "name": "uid", "type": "keyword" }, { "name": "deleted", "type": "long" } ] } } } ], "setting": { "speed": { "channel": 4 } } } }
其他就比较简单了,懒得记了,后面有问题再补充
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。