赞
踩
在现实场景中,由于数据来源的异构,数据源的格式往往是难以统一的,这就导致大量具有价值的数据通常是以非结构化的形式聚合在一起的。对于这些非结构化数据,最常见的数据结构就是JSON,而对应的数据库就是MongoDB。
利用MongoDB这样的NoSQL数据库,我们可以把异构的数据源整合到若干个collection中,通过key-value的形式对数据进行增删改查。虽然MongoDB在数据聚合上有天然的优势,但是在事务处理(OLTP)与数据分析(OLAP)上的表现却不尽人意。由于MongoDB自身是一个文档型数据库,一方面,MongoDB 并没有事务的概念,所以在需要保证数据一致性的场景下并不好用。另一方面,MongoDB的join查询也没有RDBMS来得直观方便,所以在需要多表关联查询的场景下也非常捉急。
通常,对于小数据集,我们都会将数据导入到类似MySQL这样的RDBMS中做进一步的结构化处理,对于大数据集则可以通过Hive导入到HDFS上。那么,将MongoDB的非结构化数据导入到RDBMS中的最优方案又是什么呢?本文将对非结构化数据与结构化数据的管道构建做详细的讨论。
从Mongo迁移数据到MySQL,我首先想到了用Python从Mongo读取数据到内存中然后再批量写入MySQL。
这里,我选择了使用pymongo。
%% bash
pip install pymongo
# 这里不需要安装 Mongo的client
首先是读取Mongo数据
from pymongo import MongoClient
client = MongoClient('192.168.1.100', 27017)
db = client['tesedb']
posts = db.test_collection
condition = {'_id':'harryzhu'}
result_set = posts.find(condition)
for i in result_set:
print(i)
这里由于python会有中文的问题,我自己定义了一个将unicode转为utf-8的函数:
def getMongoData(data,field):
if data[field] is None:
return("")
else:
if isinstance(data[field], unicode):
return(data[field].encode("utf-8"))
else:
return(data[field])
接着,准备往MySQL中导入数据
%% bash
pip install MySQL-python
# 这里需要安装 MySQL的client
import MySQLdb
db = MySQLdb.connect("192.168.1.100","root","harryzhu","testdb" )
cursor = db.cursor()
values = r"(\'{id}\',\'{value}\',\'{datetime}\',\'{stock_code}\',\'{share}\')".format(id=getMongoData(i,"_id"),value=getMongoData(i,"value"),datetime=getMongoData(i,"datetime"),stock_code=getMongoData(i,"stock_code"),share=getMongoData(i,"share"))
sql = r"INSERT INTO `FinanceR` (`id`,`value`,`datetime`,`stock_code`,`share`) VALUES " + values
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
db.close()
从Mongo中读取的JSON需要在这里拼接SQL语句是一件用户体验非常糟糕的事情。在尝试拼接sql 2个小时后,我果断放弃了用Python导数据的想法。
由于拼接SQL是非常蛋疼的一件事情,我想到了利用R中的data frame直接完成数据的插入黑魔法。
首先,同样是需要将数据从Mongo中读取出来.在尝试使用RMongo,rmongodb以及mongolite之后,我依然选择了比较古老的RMongo。在使用的过程中,这三个包都有各自的问题。
rmongodb的教程含糊不清,看了很久都没有找到调用远程mongo数据库的case,而出于jeroenooms大人之手的后起之秀mongolite则在导入数据的时候果断的丢失了非常关键的_id字段,在安装最新包之后会出现jsonlite依赖包的异常。RMongo则在读取数据时将两个中文字段混淆成了一个字段,导致整个数据结构错乱。
三条路子全军覆没,这让我情何以堪,好在使用R的经验颇丰,通过中文的转换和切割就轻松解决了这个问题。
下面演示一下如何使用RMongo解决Mongo数据的读取:
install.packages("RMongo")
Sys.setenv("JAVA_HOME"="/usr/bin/java")
library(RMongo)
# 这里不需要安装 Mongo的client
library(dplyr)
# 设置数据库
FinanceR <- RMongo::mongoDbConnect(host="192.168.1.100")
dbShowCollections(FinanceR)
# 设置查询语句
condition = "{}"
# 得到返回结果
output <- RMongo::dbGetQuery(FinanceR, collection="portfolio_20160619", condition, skip=0, limit=2)
input <- output %>%
dplyr::mutate(stock_name1 = iconv(strsplit(iconv(stock_name,from = "utf-8", to = "gbk"),"\100")[[1]][1],from = "gbk",to = "utf-8"))%>%
dplyr::mutate(portfolio_name = iconv(strsplit(iconv(stock_name,from = "utf-8", to = "gbk"),"\100")[[1]][2],from = "gbk",to = "utf-8"))%>%
dplyr::select(-stock_name)%>%
rbind(cum_value="",risk="")
dbDisconnect(FinanceR)
现在,我们已经把Mongo中的数据成功导入到了内存中,下面将讲解如何使用黑魔法直接将整个data frame压入数据库。
install.packages("RMySQL")
library(RMySQL)
# 这里不需要安装 MySQL的client
con <- RMySQL::dbConnect(RMySQL::MySQL(),
user="FinanceR", password="FinanceR",
dbname="FinanceR", host="192.168.1.100")
# 选择追加,而不是重写的方式来添加数据,否则数据库的schema会被重写
RMySQL::dbWriteTable(con,input,row.names = FASLE, overwrite = FALSE,
append = TRUE)
on.exit(dbDisconnect(con))
Python 和 R 的方式各有利弊,对于Python而言目前简单粗暴的方式就是SQL拼接,而R则在流水化上比较困难。所以最后转向寻求Shell命令的方式,通过调用 Mongo client 和 MySQL client 的 API 来完成整个数据的转化操作。
经过研究发现 Mongo Client 提供了将数据转成 csv 格式的接口,之后mysql则通过load命令可以将数据加载到数据库中。
通过将非结构化数据转化为 data frame 后直接压入 RDBMS,一方面省去了枯燥的SQL拼接,一方面操作又直观清晰不易出错,对于小数据集合,直接用这样的方法增量导入数据不失为一种好方法,对于批量数据的流水化则采用Shell脚本调用Mongo和MySQL客户端命令较为合适。
转载自作者HarryZhu的FinanceR专栏:https://segmentfault.com/blog/harryprince
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。