python 处理大文件
The Python programming language has become more and more popular in handling data analysis and processing because of its certain unique advantages. It’s easy to read and maintain. pandas, with a rich library of functions and methods packaged in it, is a fast, flexible and easy to use data analysis and manipulation tool built on top of Python. It is one of the big boosters to make Python an efficient and powerful data analysis environment.
Python编程语言因其某些独特的优势而在处理数据分析和处理方面变得越来越流行。 易于阅读和维护。 pandas具有丰富的功能和方法库,是构建在Python之上的快速,灵活且易于使用的数据分析和操作工具。 它是使Python成为高效而强大的数据分析环境的巨大推动力之一。
pandas is memory-based. It does a great job when the to-be-manipulated data can fit into the memory. It is inconvenient, even unable, to deal with big data, which can’t be wholly loaded into the memory. Large files, however, like those containing data imported from the database or downloaded from the web, are common in real-world businesses. We need to have ways to manage them. How? That’s what I’d like to say something about.
大熊猫是基于记忆的。 当要处理的数据可以放入内存时,它的工作非常出色。 处理大数据是不方便的,甚至无法处理,大数据无法完全加载到内存中。 但是,大型文件(例如包含从数据库导入的数据或从Web下载的数据的文件)在现实世界中很常见。 我们需要有办法来管理它们。 怎么样? 这就是我想说的话。
By “big data” here, I am not talking about the TB or PB level data that requires distributed processing. I mean the GB level file data that can’t fit into the normal PC memory but can be held on disk. This is the more common type of big file processing scenario.
这里所说的“大数据”,并不是在谈论需要分布式处理的TB或PB级数据。 我的意思是GB级别的文件数据无法容纳到普通PC内存中,但可以保存在磁盘上。 这是大文件处理方案中较常见的类型。
Since a big file can’t be loaded into the memory at once, we often need to retrieve it line by line or chunk by chunk for further processing. Both Python and pandas support this way of retrieval, but they don’t have cursors. Because of the absence of a cursor mechanism, we need to write code to implement the chunk-by-chunk retrieval in order to use it in functions and methods; sometimes we even have to write code to implement functions and methods. Here I list the typical scenarios of big file processing and their code examples to make you better understand Python’s way of dealing with them.
由于无法将大文件立即加载到内存中,因此我们经常需要逐行或逐块检索它以进行进一步处理。 Python和pandas都支持这种检索方式,但是它们没有游标。 由于没有游标机制,我们需要编写代码来实现逐块检索,以便在函数和方法中使用它。 有时我们甚至必须编写代码来实现功能和方法。 在这里,我列出了大文件处理的典型场景及其代码示例,以使您更好地理解Python的处理方式。
I.汇总 (I. Aggregation)
A simple aggregation is to traverse values in the target column and to perform calculation according to the specified aggregate operation, such as the sum operation that adds up traversed values; the count operation that records the number of traversed values; and the mean operation that adds up and counts the traversed values and then divides the sum by the number. Here let’s look at how Python does a sum.
一种简单的聚合是遍历目标列中的值,并根据指定的聚合操作(例如,将求和后的值相加的求和操作)进行计算。 记录遍历值数量的计数操作; 以及求和的平均值运算,然后对所遍历的值进行计数,然后将和除以数字。 在这里,让我们看一下Python如何进行求和。
Below is a part of a file:
以下是文件的一部分:
To calculate the total sales amount, that is, doing sum over the amount column:
要计算总销售额,即对“总金额”列求和:
1. Retrieve file line by line
1.逐行检索文件
total=0
总数= 0
with open(“orders.txt”,’r’) as f:
将open(“ orders.txt”,'r')设为f:
line=f.readline()
line = f.readline()
while True:
而True:
line = f.readline()
行= f.readline()
if not line:
如果不行:
break
打破
total += float(line.split(“\t”)[4])
总计+ = float(line.split(“ \ t”)[4])
print(total)
打印(总计)
2. Retrieve file chunk by chunk in pandas
2.在大熊猫中逐块检索文件
pandas supports data retrieval chunk by chunk. Below is the workflow diagram:
大熊猫支持逐块的数据检索。 下面是工作流程图:
import pandas as pd
将熊猫作为pd导入
chunk_data = pd.read_csv(“orders.txt”,sep=”\t”,chunksize=100000)
chunk_data = pd.read_csv(“ orders.txt”,sep =“ \ t”,chunksize = 100000)
total=0
总数= 0
for chunk in chunk_data:
对于chunk_data中的块:
total+=chunk[‘amount’].sum()
total + = chunk ['amount']。sum()
print(total)
打印(总计)
Pandas is good at retrieval and processing in large chunks. In theory, the bigger the chunk size, the faster the processing. Note that the chunk size should be able to fit into the available memory. If the chunksize is set as 1, it is a line-by-line retrieval, which is extremely slow. So I do not recommend a line-by-line retrieval when handling large files in pandas.
熊猫擅长大块地进行检索和处理。 从理论上讲,块大小越大,处理速度越快。 请注意,块大小应能够适合可用内存。 如果chunksize设置为1,则是逐行检索,这非常慢。 因此,在处理大熊猫中的大文件时,我不建议逐行检索。
二。 筛选 (II. Filtering)
The workflow diagram for filtering in pandas:
熊猫过滤的工作流程图:
Similar to the aggregation, pandas will divide a big file into multiple chunks (n), filter each data chunk and concatenate the filtering results.
与聚合类似,大熊猫会将一个大文件分成多个块( n ),过滤每个数据块并连接过滤结果。
To get the sales records in New York state according to the above file:
要根据上述文件获取纽约州的销售记录:
1. With small data sets
1.小数据集
import pandas as pd
将熊猫作为pd导入
chunk_data = pd.read_csv(“orders.txt”,sep=”\t”,chunksize=100000)
chunk_data = pd.read_csv(“ orders.txt”,sep =“ \ t”,chunksize = 100000)
chunk_list = []
chunk_list = []
for chunk in chunk_data:
对于chunk_data中的块:
chunk_list.append(chunk[chunk.state==”New York”])
chunk_list.append(chunk [chunk.state ==“纽约”])
res = pd.concat(chunk_list)
res = pd.concat(chunk_list)
print(res)
打印(res)
2. With big data sets
2.大数据集
import pandas as pd
将熊猫作为pd导入
chunk_data = pd.read_csv(“orders.txt”,sep=”\t”,chunksize=100000)
chunk_data = pd.read_csv(“ orders.txt”,sep =“ \ t”,chunksize = 100000)
n=0
n = 0
for chunk in chunk_data:
对于chunk_data中的块:
need_data = chunk[chunk.state==’New York’]
need_data =块[chunk.state =='纽约']
if n == 0:
如果n == 0:
need_data.to_csv(“orders_filter.txt”,index=None)
need_data.to_csv(“ orders_filter.txt”,index = None)
n+=1
n + = 1
else:
其他:
need_data.to_csv(“orders_filter.txt”,index=None,mode=’a’,header=None)
need_data.to_csv(“ orders_filter.txt”,index = None,mode ='a',header = None)
The logic of doing aggregates and filters is simple. But as Python doesn’t provide the cursor data type, we need to write a lot of code to get them done.
进行聚合和过滤的逻辑很简单。 但是由于Python不提供游标数据类型,因此我们需要编写大量代码来完成它们。
三, 排序 (III. Sorting)
The workflow diagram for sorting in pandas:
熊猫排序的工作流程图:
Sorting is complicated because you need to:
排序很复杂,因为您需要:
1. Retrieve one chunk each time;
1.每次取一块;
2. Sort this chunk;
2.排序该块;
3. Write the sorting result of each chunk to a temporary file;
3.将每个块的排序结果写入一个临时文件;
4. Maintain a list of k elements (k is the number of chunks) into which a row of data in each temporary file is put;
4.维护一个k元素的列表( k是块的数量),每个临时文件中的一行数据放入其中;
5. Sort records in the list by the sorting field (same as the sort direction in step 2);
5.按排序字段对列表中的记录进行排序(与步骤2中的排序方向相同);
6. Write the record with smallest (in ascending order) or largest (in descending order) value to the result file;
6.将具有最小(升序)或最大(降序)值的记录写入结果文件;
7. Put another row from each temporary file to the list;
7.将每个临时文件的另一行放入列表;
8. Repeat step 6, 7 until all records are written to the result file.
8.重复步骤6、7,直到将所有记录写入结果文件。
To sort the above file by amount in ascending order, I write a complete Python program of implementing the external sorting algorithm:
为了按升序对上述文件进行排序,我编写了一个完整的Python程序来实现外部排序算法:
import pandas as pd
将熊猫作为pd导入
import os
导入操作系统
import time
导入时间
import shutil
进口壁垒
import uuid
导入uuid
import traceback
导入回溯
def parse_type(s):
def parse_type(s):
if s.isdigit():
如果s.isdigit():
return int(s)
返回int
try:
尝试:
res = float(s)
res =浮点数
return res
返回资源
except:
除:
return s
返回s
def pos_by(by,head,sep):
def pos_by(by,head,sep):
by_num = 0
by_num = 0
for col in head.split(sep):
对于head.split(sep)中的col:
if col.strip()==by:
如果col.strip()== by:
break
打破
else:
其他:
by_num+=1
by_num + = 1
return by_num
返回by_num
def merge_sort(directory,ofile,by,ascending=True,sep=”,”):
def merge_sort(目录,ofile,by,ascending = True,sep =”,”):
with open(ofile,’w’) as outfile:
使用open(ofile,'w')作为outfile:
file_list = os.listdir(directory)
file_list = os.listdir(目录)
file_chunk = [open(directory+”/”+file,’r’) for file in file_list]
file_chunk = [为file_list中的文件打开(directory +” /” + file,'r'))
k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]
k_row = [file_chunk [i] .readline()for i in range(len(file_chunk))]
by = pos_by(by,k_row[0],sep)
by = pos_by(by,k_row [0],sep)
outfile.write(k_row[0])
outfile.write(k_row [0])
k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]
k_row = [file_chunk [i] .readline()for i in range(len(file_chunk))]
k_by = [parse_type(k_row[i].split(sep)[by].strip()) for i in range(len(file_chunk))]
k_by = [parse_type(k_row [i] .split(sep)[by] .strip())for i in range(len(file_chunk))]
with open(ofile,’a’) as outfile:
使用open(ofile,'a')作为outfile:
while True:
而True:
for i in range(len(k_by)):
对于范围内的我(len(k_by)):
if i >= len(k_by):
如果我> = len(k_by):
break
打破
sorted_k_by = sorted(k_by) if ascending else sorted(k_by,reverse=True)
sorted_k_by =排序(k_by),如果升序则排序(k_by,reverse = True)
if k_by[i] == sorted_k_by[0]:
如果k_by [i] == sorted_k_by [0]:
outfile.write(k_row[i])
outfile.write(k_row [i])
k_row[i] = file_chunk[i].readline()
k_row [i] = file_chunk [i] .readline()
if not k_row[i]:
如果不是k_row [i]:
file_chunk[i].close()
file_chunk [i] .close()
del(file_chunk[i])
del(file_chunk [i])
del(k_row[i])
del(k_row [i])
del(k_by[i])
del(k_by [i])
else:
其他:
k_by[i] = parse_type(k_row[i].split(sep)[by].strip())
k_by [i] = parse_type(k_row [i] .split(sep)[by] .strip())
if len(k_by)==0:
如果len(k_by)== 0:
break
打破
def external_sort(file_path,by,ofile,tmp_dir,ascending=True,chunksize=50000,sep=’,’,usecols=None,index_col=None):
def external_sort(文件路径,by,ofile,tmp_dir,升序=真,块大小= 50000,sep =',',usecols =无,index_col =无):
os.makedirs(tmp_dir,exist_ok=True)
os.makedirs(tmp_dir,exist_ok =真)
try:
尝试:
data_chunk = pd.read_csv(file_path,sep=sep,usecols=usecols,index_col=index_col,chunksize=chunksize)
data_chunk = pd.read_csv(file_path,sep = sep,usecols = usecols,index_col = index_col,chunksize = chunksize)
for chunk in data_chunk:
对于data_chunk中的块:
chunk = chunk.sort_values(by,ascending=ascending)
chunk = chunk.sort_values(by,ascending = ascending)
chunk.to_csv(tmp_dir+”/”+”chunk”+str(int(time.time()*10**7))+str(uuid.uuid4())+”.csv”,index=None,sep=sep)
chunk.to_csv(tmp_dir +” /” +“ chunk” + str(int(time.time()* 10 ** 7))+ str(uuid.uuid4())+”。csv”,index =无,sep = 9月)
merge_sort(tmp_dir,ofile=ofile,by=by,ascending=ascending,sep=sep)
merge_sort(tmp_dir,ofile = ofile,by = by,ascending = ascending,sep = sep)
except Exception:
例外:
print(traceback.format_exc())
打印(traceback.format_exc())
finally:
最后:
shutil.rmtree(tmp_dir, ignore_errors=True)
shutil.rmtree(tmp_dir,ignore_errors = True)
if __name__ == “__main__”:
如果__name__ ==“ __main__”:
infile = “D:/python_question_data/orders.txt”
infile =“ D:/python_question_data/orders.txt”
ofile = “D:/python_question_data/extra_sort_res_py.txt”
ofile =“ D:/python_question_data/extra_sort_res_py.txt”
tmp = “D:/python_question_data/tmp”
tmp =“ D:/ python_question_data / tmp”
external_sort(infile,’amount’,ofile,tmp,ascending=True,chunksize=1000000,sep=’\t’)
external_sort(infile,'amount',ofile,tmp,ascending = True,chunksize = 1000000,sep ='\ t')
Python handles the external sort using line-by-line merge & write. I didn’t use pandas because it is incredibly slow when doing the line-wise retrieval. Yet it is fast to do the chunk-wise merge in pandas. You can compare their speeds if you want to.
Python使用逐行合并和写入处理外部排序。 我没有使用熊猫,因为它在进行逐行检索时非常慢。 然而,在熊猫中进行逐块合并的速度很快。 您可以根据需要比较他们的速度。
The code is too complicated compared with that for aggregation and filtering. It’s beyond a non-professional programmer’s ability. The second problem is that it is slow to execute.
与用于聚合和过滤的代码相比,该代码过于复杂。 这超出了非专业程序员的能力。 第二个问题是执行速度慢。
The third problem is that it is only for standard structured files and single column sorting. If the file doesn’t have a header row, or if there are variable number of separators in rows, or if the sorting column contains values of nonstandard date format, or if there are multiple sorting columns, the code will be more complicated.
第三个问题是它仅适用于标准结构化文件和单列排序。 如果文件没有标题行,或者行中分隔符的数量可变,或者排序列包含非标准日期格式的值,或者存在多个排序列,则代码将更加复杂。
IV。 分组 (IV. Grouping)
It’s not easy to group and summarize a big file in Python, too. A convenient way out is to sort the file by the grouping column and then to traverse the ordered file during which neighboring records are put to same group if they have same grouping column values and a record is put to a new group if its grouping column value is different from the previous one. If a result set is too large, we need to write grouping result before the memory lose its hold.
用Python分组和总结一个大文件也不容易。 一种方便的解决方法是按分组列对文件进行排序,然后遍历有序文件,在此过程中,如果相邻记录具有相同的分组列值,则将它们放入同一组,如果相邻记录的分组列值相同,则将记录放入新的组与前一个不同。 如果结果集太大,则需要在内存丢失之前写入分组结果。
It’s convenient yet slow because a full-text sorting is needed. Generally databases use the hash grouping to increase speed. It’s effective but much more complicated. It’s almost impossible for non-professionals to do that.
这很方便,但是很慢,因为需要全文本排序。 通常,数据库使用哈希分组来提高速度。 它是有效的,但要复杂得多。 对于非专业人士而言,这几乎是不可能的。
So, it’s inconvenient and difficult to handle big files with Python because of the absence of cursor data type and relevant functions. We have to write all the code ourselves and the code is inefficient.
因此,由于缺少游标数据类型和相关功能,使用Python处理大文件既不方便又困难。 我们必须自己编写所有代码,并且代码效率低下。
If only there was a language that a non-professional programmer can handle to process large files. Luckily, we have esProc SPL.
如果只有非专业程序员可以使用的语言来处理大文件。 幸运的是,我们有集算器SPL。
It’s convenient and easy to use. Because SPL is designed to process structured data and equipped with a richer library of functions than pandas and the built-in cursor data type. It handles large files concisely, effortlessly and efficiently.
它既方便又易于使用。 因为SPL旨在处理结构化数据,并且配备了比熊猫和内置光标数据类型更丰富的功能库。 它可以简洁,轻松,高效地处理大型文件。
1. Aggregation
1.汇总
A1=file(file_path).cursor@tc()
A1 =文件(文件路径).cursor @ tc()
A2=A1.total(sum(col))
A2 = A1.total(sum(col))
2. Filtering
2.筛选
A1=file(file_path).cursor@tc()A2=A1.select(key==condition)A3=A2.fetch() / Fetch data from a small result
A1 =文件(文件路径)。 游标 @tc()A2 = A1.select(key == condition)A3 = A2.fetch()/从小结果中获取数据
A4=file(out_file).export@tc(A2) / Write a large result set to a target file
A4 = file(out_file).export @ tc(A2)/将较大的结果集写入目标文件
3. Sorting
3.排序
A1=file(file_path).cursor@tc()
A1 =文件(文件路径)。 游标 @tc()
A2=A1.sortx(key)
A2 = A1.sortx(key)
A3=file(out_file).export@tc(A2)
A3 =文件(外文件).export @ tc(A2)
4. Grouping
4.分组
A1=file(file_path).cursor@tc()A2=A1.groups(key;sum(coli):total)/ Return a small result set directly
A1 =文件(文件路径)。 cursor @tc()A2 = A1.groups(key; sum(coli):total)/直接返回较小的结果集
A3=A1.groupx(key;sum(coli):total)A4=file(out_file).export@tc(A3)/ Write a large result set to a target file
A3 = A1.groupx(key; sum(coli):total)A4 = file(out_file).export @ tc(A3)/将较大的结果集写入目标文件
SPL also employs the above-mentioned HASH algorithm to effectively increase performance.
SPL还采用上述HASH算法来有效地提高性能。
SPL has the embedded parallel processing ability to be able to make the most use of the multi-core CPU to boost performance. A @m option only enables a function to perform parallel computing.
SPL具有嵌入式并行处理功能,可以充分利用多核CPU来提高性能。 @m选项仅使函数能够执行并行计算。
A1=file(file_path).cursor@mtc()
A1 =文件(文件路径)。 游标 @mtc()
A2=A1.groups(key;sum(coli):total)
A2 = A1.groups(key; sum(coli):total)
There are a lot of Python-version parallel programs, but none is simple enough.
有很多Python版本的并行程序,但是没有一个足够简单。
翻译自: https://medium.com/analytics-vidhya/how-python-handles-big-files-fc60ff90b819
python 处理大文件