当前位置:   article > 正文

Datax从入门到精通09-MongoDB同步到Hive实践_尚硅谷mongodb到hive课程

尚硅谷mongodb到hive课程

一、MongoDB

1. 时区概念

GMT 就是格林威治标准时间的英文缩写(Greenwich Mean Time 格林尼治标准时间),是世界标准时间,GMT+8 是格林威治时间+8小时,中国所在时区就是gmt+8 。在国内,本地时间和“GMT+8”时区无区别。在国外,本地时间每个地方都不相同,所以只能用一条一条的经线计算时间。

然而,mongodb默认就是UTC时间,服务器端貌似无法设置,万能的网友提供的解决方案很多,网上一大堆,其根本原因就是使用JAVA写入的时候mongo驱动里面做了转换。
参考https://blog.csdn.net/jsdxshi/article/details/72831116
解决方案: https://blog.csdn.net/weixin_26782843/article/details/114094264

2. Mongo常用命令
mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/act_operation_message
MongoDB shell version v4.2.3
Implicit session: session { "id" : UUID("f293288e-5750-4898-95cf-5fa406376bea") }
MongoDB server version: 4.2.10
mgset-12038389:PRIMARY> show collections;
db.sms_notify_send_record_202004.find({"messageId":"0ebfa545-6bd6-4e3a-bfa5-456bd60e3a2f"}).pretty();
db.messageResponseResult.find({"_id":ObjectId("60462f0e2d23b30f871dcef9")}).pretty();
{
	"_id" : ObjectId("60462f0e2d23b30f871dcef9"),
	"eventType" : "E_SEND_UNACTIVATED_MSG",
	"message" : "设备不在线或内部处理异常",
	"traceId" : "08380500-bfce-41ee-931f-e2c7e6403e49",
	"receiverId" : "wxid_bz5tgkhxnegp12",
	"senderWeChatId" : "wxid_og03p4hvd9so12",
	"stuId" : 1444271,
	"selId" : 42711,
	"status" : 2,
	"createTime" : ISODate("2021-03-08T22:05:02.952Z"),
	"updateTime" : ISODate("2021-03-08T22:05:02.952Z")
}

db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16"),"$lt":ISODate("2021-03-17")}}).count(true) 
db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
3. MongoDB & DataX类型对比:

在这里插入图片描述

4. MongoDB Java API 构造数据记录:
import com.alibaba.fastjson.JSONObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.util.*;

/**
 * mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/test
 */
public class Mongo2hive {
    public static void main(String[] args) {
        try {
            ServerAddress serverAddress = new ServerAddress("dds-uf6f9645182d5f242.mongodb.rds.aliyuncs.com", 3717);
            List<ServerAddress> addrs = new ArrayList<ServerAddress>();
            addrs.add(serverAddress);

            //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码
            MongoCredential credential = MongoCredential.createScramSha1Credential("user", "db", "pwd".toCharArray());
            List<MongoCredential> credentials = new ArrayList<MongoCredential>();
            credentials.add(credential);
            //通过连接认证获取MongoDB连接
            MongoClient mongoClient = new MongoClient(addrs, credentials);

            //连接到数据库
            MongoDatabase mongoDatabase = mongoClient.getDatabase("zm-ai-test");
            // 创建collection
            // mongoDatabase.createCollection("test_table_01");

            MongoCollection<Document> collection = mongoDatabase.getCollection("test_table_01");

            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name","李华");
            jsonObject.put("age",22);

            //插入文档 构造测试数据
            List<Document> documents = new ArrayList<Document>(2048);
            for (int i = 0; i < 1000; i++) {
                Document document = new Document("title", "course-" + i).
                        append("uuid", UUID.randomUUID()).
                        append("price", 999.99).
                        append("info",jsonObject.toJSONString()).
                        append("created_time",new Date().getTime());
                documents.add(document);
            }
            collection.insertMany(documents);

            // 遍历文档
            // FindIterable<Document> findIterable = collection.find();
            // MongoCursor<Document> mongoCursor = findIterable.iterator();
            // while(mongoCursor.hasNext()){
            //     System.out.println(mongoCursor.next());
            // }

            // 获取列类型
            Set<String> keys = new HashSet<>();
            for (Document doc : collection.find().limit(5)) {
                keys.addAll(doc.keySet());
            }
            keys.stream().forEach( k -> System.out.println("col = " +k));


            System.out.println("Count ---> " + collection.countDocuments());

        } catch (Exception e) {
            System.err.println(e.getClass().getName() + ": " + e.getMessage());
        }


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

二、全量抽取

仅仅是设置query为空即可,可以参考官网 ,这里需要说明的是mongo的时间会在抽取过来的时候加上8个小时,也就是UTC时间变为了GMT+8,以下是个对比
在这里插入图片描述

Hive中的时间转换如下:

---- 将mongodb的UTC时间转化为GMT+8时间  28800/3600=8 小时
hive> select from_unixtime(unix_timestamp(regexp_replace('2021-03-08T22:05:02.952Z', 'T|Z', ' '))+28800,'yyyy-MM-dd HH:mm:ss');
OK
2021-03-09 06:05:02
Time taken: 0.295 seconds, Fetched: 1 row(s)
  • 1
  • 2
  • 3
  • 4
  • 5

这说明,DataX在抽取的时候已经自动对其进行了转化。

三、增量抽取

示例脚本如下:

{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mongodbreader",
                    "parameter":{
                        "address":[
                            "dds-xxx.mongodb.rds.aliyuncs.com:3717",
                            "dds-xxx.mongodb.rds.aliyuncs.com:3717"
                        ],
                        "collectionName":"messageResponseResult",
                        "column":[
                            {
                                "index":0,
                                "name":"_id",
                                "type":"string"
                            },
                            {
                                "index":1,
                                "name":"eventType",
                                "type":"string"
                            },
                            {
                                "index":2,
                                "name":"message",
                                "type":"string"
                            },
                            {
                                "index":3,
                                "name":"traceId",
                                "type":"string"
                            },
                            {
                                "index":4,
                                "name":"receiverId",
                                "type":"string"
                            },
                            {
                                "index":5,
                                "name":"senderWeChatId",
                                "type":"string"
                            },
                            {
                                "index":6,
                                "name":"stuId",
                                "type":"int"
                            },
                            {
                                "index":7,
                                "name":"les_id",
                                "type":"int"
                            },
                            {
                                "index":8,
                                "name":"status",
                                "type":"int"
                            },
                            {
                                "index":9,
                                "name":"createTime",
                                "type":"date"
                            },
                            {
                                "index":10,
                                "name":"updateTime",
                                "type":"date"
                            }
                        ],
                        "dbName":"act_operation_message",
                        "userName":"xxxx",
                        "userPassword":"xxx",
                        "query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}"
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "column":[
                        {
                                "name":"_id",
                                "type":"string"
                            },
                            {
                                "name":"eventType",
                                "type":"string"
                            },
                            {
                                "name":"message",
                                "type":"string"
                            },
                            {
                                "name":"traceId",
                                "type":"string"
                            },
                            {
                                "name":"receiverId",
                                "type":"string"
                            },
                            {
                                "name":"senderWeChatId",
                                "type":"string"
                            },
                            {
                                "name":"stuId",
                                "type":"int"
                            },
                            {
                                "name":"les_id",
                                "type":"int"
                            },
                            {
                                "name":"status",
                                "type":"int"
                            },
                            {
                                "name":"createTime",
                                "type":"string"
                            },
                            {
                                "name":"updateTime",
                                "type":"string"
                            }
                        ],
                        "defaultFS":"hdfs://nameservice1",
                        "haveKerberos":"true",
                        "kerberosKeytabFilePath":"/home/zmbigdata/kerberos/zm_app_prd.keytab",
                        "kerberosPrincipal":"zm_app_prd@FAYSON.COM",
                        "encoding":"UTF-8",
                        "fileType":"orc",
                        "fileName":"ods_message_response_result_df",
                        "path":"/user/hive/warehouse/test_ods.db/ods_message_response_result_df/pt=2021-03-17",
                        "writeMode":"append",
                         "compress":"SNAPPY",
                         "fieldDelimiter":"\u0001"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"2"
            },
            "errorLimit":{
                "record":0
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149

增量数据抽取数据量对比:

#mongdb查询:
db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true)
6781

#datax同步:
"query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}"

2021-03-17 17:40:36.978 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-03-17 17:40:25
任务结束时刻                    : 2021-03-17 17:40:36
任务总计耗时                    :                 11s
任务平均流量                    :          105.99KB/s
记录写入速度                    :            678rec/s
读出记录总数                    :                6781
读写失败总数                    :                   0

# 数据量可以看出是一致的。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

将updateTime type改为Date后对结果进行了再次check,发现时间还是多了8个小时,如下:

hive> select `_id`,message,updatetime from ods_message_response_result_df where pt = '2021-03-17' and `_id`='60462f0e2d23b30f871dcef9';
OK
60462f0e2d23b30f871dcef9	设备不在线或内部处理异常	2021-03-09 06:05:02
Time taken: 23.031 seconds, Fetched: 1 row(s)
  • 1
  • 2
  • 3
  • 4

总结

关于时间差问题,可以在开发的时候就已经转化好,这样在提取的时候就是正常东八区时间了。

参考:
https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md

mongoDB数据类型:https://www.cnblogs.com/YWDCB/p/9453887.html

MongoDB存取时间差问题解决方案:https://www.jb51.net/article/147182.htm

MongDB时区问题:https://cloud.tencent.com/developer/article/1446798

如果对你有帮助,不妨点个赞

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/141155
推荐阅读
相关标签