赞
踩
前情提要:
工作原因需要处理一批约30G左右的CSV数据,数据量级不需要hadoop的使用,同时由于办公的本本内存较低的缘故,需要解读取数据时内存不足的原因。
操作流程:
方法与方式:首先是读取数据,常见的csv格式读取时一次性全部读取进来, 面对数据量较大(本次3亿条实车数据)时,需要 分批 并且有 选择性 的读取后 提取有效信息 删除冗余信息并清理内存。
同时,为了使处理数据时效率更高,将整理好的数据实时读取进来以后,保存成快速且可读的数据形式另行存储。然后释放内存并读取下一批数据直到整个流程结束
下面是操作代码:
#import pickle # pkl存储与 hdf5存储 import pandas as pd # 释放内存 import gc reader = pd.read_csv(r'E:\VEH_GBK_2019-01-01.csv', encoding='gbk',iterator=True,low_memory=False,usecols=[0,1,2,4]) title_mc=['location','vid','上报时间','充电状态'] loop = True chunkSize = 1000000 ans_vid={} location_list=['上海','重庆','广东','北京'] for i in location_list: ans_vid[i]=[] while loop: try: chunk = reader.get_chunk(chunkSize) chunk.columns=title_mc; chunk['充电状态']=chunk['充电状态'].astype(str) chunk['location']=chunk['location'].astype(str) for i in location_list: temp=chunk[chunk['location'].str.contains(i)] if temp[(temp['充电状态']=='1.0') | (temp['充电状态']=='4.0')].empty==False: ans_vid[i].append(temp[(temp['充电状态']=='1.0') | (temp['充电状态']=='4.0')]) del temp gc.collect() del chunk gc.collect() except StopIteration: loop = False print ("Iteration is stopped.") for i in location_list: ans_vid[i]=pd.concat(ans_vid[i]) location_list=['shanghai','chongqing','guangdong','beijing'] for i in location_list: ans_vid[i].to_hdf(i+'_charging.h5',key=ans_vid[i],encoding='gbk')
gc.collect()放在del 参数的后面用以及时释放内存。
读取的核心代码是:
reader = pd.read_csv(r'E:\VEH_GBK_2019-01-01.csv', encoding='gbk',iterator=True,low_memory=False,usecols=[0,1,2,4]) # usecols是读取原数据的某几列 chunkSize是分批读取的量级 chunk = reader.get_chunk(chunkSize)
本次读取的存储格式采用的是h5格式即hdf,该种格式易于读取较大数据量级,同时也有一些数据格式可以保存较大的数据量级: pkl ,npy等
推荐h5(保存dataframe)与pkl(保存字典格式),其读取速度更快.易于使用
h5格式调用pandas内置对dataframe的保存即可: 例 df是一个需要保存的较大的dataframe。代码为
1 |
|
pkl 保存需要先导入pickle ,所需保存的字典为ans_vid, 代码如下
# 导出 output = open('usage_top2veh.pkl', 'wb') pickle.dump(ans_vid, output) output.close() # 导入 pkl_file = open('usage_top2veh.pkl', 'rb') data2 = pickle.load(pkl_file) pkl_file.close()
——————————————活在当下,首先就是要做好当下的事.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。