赞
踩
为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。
例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。
在这篇文章中,我们将学习如何使用multiprocessing
、joblib
和tqdm
Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。
开始
我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。
https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
我们将导入multiprocessing
、joblib
和tqdm
用于并行处理,pandas
用于数据导入,re
、nltk
和string
用于文本处理。
- # Parallel Computing
-
- import multiprocessing as mp
-
-
- from joblib import Parallel, delayed
-
-
- from tqdm.notebook import tqdm
-
-
- # Data Ingestion
-
-
- import pandas as pd
-
-
- # Text Processing
-
-
- import re
-
-
- from nltk.corpus import stopwords
-
-
- import string
在我们开始之前,让我们通过加倍cpu_count()
来设置n_workers
。正如你所看到的,我们有8个workers
。
- n_workers = 2 * mp.cpu_count()
-
-
- print(f"{n_workers} workers are available")
-
-
- >>> 8 workers are available
下一步,我们将使用pandas read_csv
函数读取大型CSV文件。然后打印出dataframe
的形状、列的名称和处理时间。
- %%time
- file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
- df = pd.read_csv(file_name)
-
-
- print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
输出:
- Shape:(2845342, 47)
-
-
- Column Names:
-
-
- Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
- 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
- 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
- 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
- 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
- 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
- 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
- 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
- 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
- 'Astronomical_Twilight'],
- dtype='object')
-
-
- CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
- Wall time: 46.9 s
处理文本
clean_text
是一个用于处理文本的简单函数。我们将使用nltk.copus
获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。
- def clean_text(text):
- # Remove stop words
- stops = stopwords.words("english")
- text = " ".join([word for word in text.split() if word
- not in stops])
- # Remove Special Characters
- text = text.translate(str.maketrans('', '', string.punctuation))
- # removing the extra spaces
- text = re.sub(' +',' ', text)
- return text
串行处理
对于串行处理,我们可以使用pandas的.apply()
函数,但是如果你想看到进度条,你需要为pandas激活tqdm
,然后使用.progress_apply()
函数。
我们将处理280万条记录,并将结果保存回 “Description” 列中。
- %%time
- tqdm.pandas()
-
-
- df['Description'] = df['Description'].progress_apply(clean_text)
输出
高端处理器串行处理280万行花了9分5秒。
- 100% 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/183043推荐阅读
相关标签
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。