当前位置:   article > 正文

用 Python 高效处理大文件

weather_timestamp

为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。

例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。

在这篇文章中,我们将学习如何使用multiprocessingjoblibtqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。

开始

我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

我们将导入multiprocessingjoblibtqdm用于并行处理,pandas用于数据导入,renltkstring用于文本处理。

  1. # Parallel Computing
  2. import multiprocessing as mp
  3. from joblib import Parallel, delayed
  4. from tqdm.notebook import tqdm
  5. # Data Ingestion
  6. import pandas as pd
  7. # Text Processing
  8. import re
  9. from nltk.corpus import stopwords
  10. import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers

  1. n_workers = 2 * mp.cpu_count()
  2. print(f"{n_workers} workers are available")
  3. >>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

  1. %%time
  2. file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
  3. df = pd.read_csv(file_name)
  4. print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:

  1. Shape:(2845342, 47)
  2. Column Names:
  3. Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
  4. 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
  5. 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
  6. 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
  7. 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
  8. 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
  9. 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
  10. 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
  11. 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
  12. 'Astronomical_Twilight'],
  13. dtype='object')
  14. CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
  15. Wall time: 46.9 s

处理文本

clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。

  1. def clean_text(text):
  2. # Remove stop words
  3. stops = stopwords.words("english")
  4. text = " ".join([word for word in text.split() if word
  5. not in stops])
  6. # Remove Special Characters
  7. text = text.translate(str.maketrans('', '', string.punctuation))
  8. # removing the extra spaces
  9. text = re.sub(' +',' ', text)
  10. return text

串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。

  1. %%time
  2. tqdm.pandas()
  3. df['Description'] = df['Description'].progress_apply(clean_text)

输出

高端处理器串行处理280万行花了9分5秒。

  1. 100%
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/183043
    推荐阅读
    相关标签