赞
踩
业务背景
本次需求为数据迁移,采用的方式是脚本迁移。即我拿到导出来的数据后跑脚本入库,但个人觉得直接用内置接口快照恢复会更简便(泪目)。
阅读前提
本次的Json文件是一行一条数据。
遇到的问题和解决方法
读取文件的方式-> with open 优于 open,这个应该不用再细说了吧。
读取json文件时解析速度过慢,耗时多 -> 逐行读取
但这个逐行读取是有讲究的,一开始采用网上这种常见的方法:
- with open('file.txt', 'r') as f:
- lines = f.readlines()
- for line in lines:
- print(line)
这种方法的本质是先将整个文件读取到一个大列表中,再遍历。但问题是,这个大列表是占用内存的,因此是高CPU开销的,但服务器上只有32G内存,我需要性能更好的代码。后来发现:
- with open('file.txt', 'r') as f:
- for line in f:
- print(line)
使用以上方式更节省内存,并且是最优的,比分块读取,如f.read(8K)、f.readlines(1000)更优。原因在于,f为文件对象,采用缓存I/O来对文件进行读写操作,本身就具有内存管理,可以不用担心内存管理的问题。
文件读取的性能优化后,接下来是入库性能优化。
以前服务器资源充足时,一直使用bulk批处理 + 手动数据分批,直到这次才了解到流式批量执行(stream_bulk)、并发批量执行(parallel_bulk)。
首先阅读使用手册。发现python中yield的用法,可以编写数据迭代生成器,适合于批处理。于是进一步改进代码,和流式批处理结合进行。
- from elasticsearch.helpers import streaming_bulk
- from elasticsearch.helpers import parallel_bulk
- from elasticsearch.helpers import bulk, scan
-
-
- # 数据迭代生成器
- def generate_actions():
- with open('file.json') as f:
- for line in f:
- data = json.loads(line)
- yield data
-
- # 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
- deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)
-
- # 2. streaming_bulk
- for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
- pass
-
- # 3. bulk
- bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
代码详见我的GitHub仓库。记录了我在读取大文件时代码更迭的版本。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。