赞
踩
GMT 就是格林威治标准时间的英文缩写(Greenwich Mean Time 格林尼治标准时间),是世界标准时间,GMT+8 是格林威治时间+8小时,中国所在时区就是gmt+8 。在国内,本地时间和“GMT+8”时区无区别。在国外,本地时间每个地方都不相同,所以只能用一条一条的经线计算时间。
然而,mongodb默认就是UTC时间,服务器端貌似无法设置,万能的网友提供的解决方案很多,网上一大堆,其根本原因就是使用JAVA写入的时候mongo驱动里面做了转换。
参考https://blog.csdn.net/jsdxshi/article/details/72831116
解决方案: https://blog.csdn.net/weixin_26782843/article/details/114094264
mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/act_operation_message MongoDB shell version v4.2.3 Implicit session: session { "id" : UUID("f293288e-5750-4898-95cf-5fa406376bea") } MongoDB server version: 4.2.10 mgset-12038389:PRIMARY> show collections; db.sms_notify_send_record_202004.find({"messageId":"0ebfa545-6bd6-4e3a-bfa5-456bd60e3a2f"}).pretty(); db.messageResponseResult.find({"_id":ObjectId("60462f0e2d23b30f871dcef9")}).pretty(); { "_id" : ObjectId("60462f0e2d23b30f871dcef9"), "eventType" : "E_SEND_UNACTIVATED_MSG", "message" : "设备不在线或内部处理异常", "traceId" : "08380500-bfce-41ee-931f-e2c7e6403e49", "receiverId" : "wxid_bz5tgkhxnegp12", "senderWeChatId" : "wxid_og03p4hvd9so12", "stuId" : 1444271, "selId" : 42711, "status" : 2, "createTime" : ISODate("2021-03-08T22:05:02.952Z"), "updateTime" : ISODate("2021-03-08T22:05:02.952Z") } db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16"),"$lt":ISODate("2021-03-17")}}).count(true) db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true)
import com.alibaba.fastjson.JSONObject; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import org.bson.Document; import java.util.*; /** * mongo mongodb://user:pwd@dds-xx.mongodb.rds.aliyuncs.com:3717/test */ public class Mongo2hive { public static void main(String[] args) { try { ServerAddress serverAddress = new ServerAddress("dds-uf6f9645182d5f242.mongodb.rds.aliyuncs.com", 3717); List<ServerAddress> addrs = new ArrayList<ServerAddress>(); addrs.add(serverAddress); //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码 MongoCredential credential = MongoCredential.createScramSha1Credential("user", "db", "pwd".toCharArray()); List<MongoCredential> credentials = new ArrayList<MongoCredential>(); credentials.add(credential); //通过连接认证获取MongoDB连接 MongoClient mongoClient = new MongoClient(addrs, credentials); //连接到数据库 MongoDatabase mongoDatabase = mongoClient.getDatabase("zm-ai-test"); // 创建collection // mongoDatabase.createCollection("test_table_01"); MongoCollection<Document> collection = mongoDatabase.getCollection("test_table_01"); JSONObject jsonObject = new JSONObject(); jsonObject.put("name","李华"); jsonObject.put("age",22); //插入文档 构造测试数据 List<Document> documents = new ArrayList<Document>(2048); for (int i = 0; i < 1000; i++) { Document document = new Document("title", "course-" + i). append("uuid", UUID.randomUUID()). append("price", 999.99). append("info",jsonObject.toJSONString()). append("created_time",new Date().getTime()); documents.add(document); } collection.insertMany(documents); // 遍历文档 // FindIterable<Document> findIterable = collection.find(); // MongoCursor<Document> mongoCursor = findIterable.iterator(); // while(mongoCursor.hasNext()){ // System.out.println(mongoCursor.next()); // } // 获取列类型 Set<String> keys = new HashSet<>(); for (Document doc : collection.find().limit(5)) { keys.addAll(doc.keySet()); } keys.stream().forEach( k -> System.out.println("col = " +k)); System.out.println("Count ---> " + collection.countDocuments()); } catch (Exception e) { System.err.println(e.getClass().getName() + ": " + e.getMessage()); } } }
仅仅是设置query为空即可,可以参考官网 ,这里需要说明的是mongo的时间会在抽取过来的时候加上8个小时,也就是UTC时间变为了GMT+8,以下是个对比
Hive中的时间转换如下:
---- 将mongodb的UTC时间转化为GMT+8时间 28800/3600=8 小时
hive> select from_unixtime(unix_timestamp(regexp_replace('2021-03-08T22:05:02.952Z', 'T|Z', ' '))+28800,'yyyy-MM-dd HH:mm:ss');
OK
2021-03-09 06:05:02
Time taken: 0.295 seconds, Fetched: 1 row(s)
这说明,DataX在抽取的时候已经自动对其进行了转化。
示例脚本如下:
{ "job":{ "content":[ { "reader":{ "name":"mongodbreader", "parameter":{ "address":[ "dds-xxx.mongodb.rds.aliyuncs.com:3717", "dds-xxx.mongodb.rds.aliyuncs.com:3717" ], "collectionName":"messageResponseResult", "column":[ { "index":0, "name":"_id", "type":"string" }, { "index":1, "name":"eventType", "type":"string" }, { "index":2, "name":"message", "type":"string" }, { "index":3, "name":"traceId", "type":"string" }, { "index":4, "name":"receiverId", "type":"string" }, { "index":5, "name":"senderWeChatId", "type":"string" }, { "index":6, "name":"stuId", "type":"int" }, { "index":7, "name":"les_id", "type":"int" }, { "index":8, "name":"status", "type":"int" }, { "index":9, "name":"createTime", "type":"date" }, { "index":10, "name":"updateTime", "type":"date" } ], "dbName":"act_operation_message", "userName":"xxxx", "userPassword":"xxx", "query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}" } }, "writer":{ "name":"hdfswriter", "parameter":{ "column":[ { "name":"_id", "type":"string" }, { "name":"eventType", "type":"string" }, { "name":"message", "type":"string" }, { "name":"traceId", "type":"string" }, { "name":"receiverId", "type":"string" }, { "name":"senderWeChatId", "type":"string" }, { "name":"stuId", "type":"int" }, { "name":"les_id", "type":"int" }, { "name":"status", "type":"int" }, { "name":"createTime", "type":"string" }, { "name":"updateTime", "type":"string" } ], "defaultFS":"hdfs://nameservice1", "haveKerberos":"true", "kerberosKeytabFilePath":"/home/zmbigdata/kerberos/zm_app_prd.keytab", "kerberosPrincipal":"zm_app_prd@FAYSON.COM", "encoding":"UTF-8", "fileType":"orc", "fileName":"ods_message_response_result_df", "path":"/user/hive/warehouse/test_ods.db/ods_message_response_result_df/pt=2021-03-17", "writeMode":"append", "compress":"SNAPPY", "fieldDelimiter":"\u0001" } } } ], "setting":{ "speed":{ "channel":"2" }, "errorLimit":{ "record":0 } } } }
增量数据抽取数据量对比:
#mongdb查询: db.messageResponseResult.find({"updateTime":{"$gte":ISODate("2021-03-16T00:00:00.000Z"),"$lt":ISODate("2021-03-17T00:00:00.000Z")}}).count(true) 6781 #datax同步: "query":"{\"updateTime\":{\"$gte\":ISODate(\"2021-03-16T00:00:00.000Z\"),\"$lt\":ISODate(\"2021-03-17T00:00:00.000Z\")}}" 2021-03-17 17:40:36.978 [job-0] INFO JobContainer - 任务启动时刻 : 2021-03-17 17:40:25 任务结束时刻 : 2021-03-17 17:40:36 任务总计耗时 : 11s 任务平均流量 : 105.99KB/s 记录写入速度 : 678rec/s 读出记录总数 : 6781 读写失败总数 : 0 # 数据量可以看出是一致的。
将updateTime type改为Date后对结果进行了再次check,发现时间还是多了8个小时,如下:
hive> select `_id`,message,updatetime from ods_message_response_result_df where pt = '2021-03-17' and `_id`='60462f0e2d23b30f871dcef9';
OK
60462f0e2d23b30f871dcef9 设备不在线或内部处理异常 2021-03-09 06:05:02
Time taken: 23.031 seconds, Fetched: 1 row(s)
关于时间差问题,可以在开发的时候就已经转化好,这样在提取的时候就是正常东八区时间了。
参考:
https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md
mongoDB数据类型:https://www.cnblogs.com/YWDCB/p/9453887.html
MongoDB存取时间差问题解决方案:https://www.jb51.net/article/147182.htm
MongDB时区问题:https://cloud.tencent.com/developer/article/1446798
如果对你有帮助,不妨点个赞
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。