赞
踩
multiprocessing对于习惯使用threading多线程的用户非常友好,因为它的理念是像线程一样管理进程,和
threading很像,而且对于多核CPU的利用率比 threading高得多。当进程数量大于CPU的内核数量时,等待运行的进程会等到其他进程运行完毕让出内核为止。因此,如果CPU是单核,就无法进行多进程并行。在使用多进程爬虫之前,我们需要先了解计算机CPU的核心数量。
这里用到了 multiprocessing:
from multiprocessing import cpu_count
print(cpu_count())
运行上述代码,得到的结果是6,也就
是本机的CPU核心数为6
在这里使用3个进程,代码如下:
from multiprocessing import Process, Queue
import time
import requests
link_list = []
with open('alexa.txt', 'r') as file:
file_list = file.readlines()
for each in file_list:
link = each.replace('\n', '')
link_list.append(link)
class MyProcess(Process):
def __init__(self, q):
Process.__init__(self)
self.q = q
def run(self):
print("Starting ", self.pid)
while not self.q.empty():
self.crawler(self.q)
print("Exiting", self.pid)
def crawler(self, q):
url = q.get(timeout=2)
try:
r = requests.get(url, timeout=20)
print(q.qsize(), r.status_code, url)
except Exception as e:
print(q.qsize(), url, 'Error:', e)
if __name__ == '__main__':
workQueue = Queue(1010)
start = time.time()
# 填充队列
for url in link_list:
print(url)
workQueue.put(url)
print(workQueue)
for i in range(3):
p = MyProcess(workQueue)
p.daemon = True
p.start()
p.join()
end = time.time()
print('Process+ Queue多进程爬虫的总时间为:', end - start)
print('Main process Ended!')
在上述代码中,使用 multiprocessing的方式基本和 thread库类似。首先使用:from multiprocessing import
Process, Queue导入 multiprocessing库。值得注意的是,在 thread多线程中用来控制队列的
Queue库, multiprocessing自带了。和thread类似的是,在读取链接列表后创建了 MyProcess这个类,变量是 workQueue队列。该类的其他部分基本与 thread多线程类似。我们继续使用循环来添加进程,与多线程不同的是,在多进程中设置了daemono p. daemon=true,在多进程中,每个进程都可以单独设置它的属
性,如果将 daemon设置为True,当父进程结束后,子进程就会自动被终止。
当被操作对象数目不大时,可以直接利用multiprocessingProcess中的动态成生多个进程,十几个还好,但如果是上百个、上千个进程,手动地限制进程数量就太过烦琐,此时可以使用Pool发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用。当有新的请求提交到pool中时,如果池还没有满,就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定的最大值,该请求就会继续等待,直到池中有进程结束才能够创建新的进程。
在使用Pool之前需要了解一下阻塞和非阻塞的概念。阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态。阻塞要等到回调结果出来,在有结果之前,当前进程会被挂起。非阻塞为添加进程后,不一定非要等到结果出来就可以添加其他进程运行。
首先,我们可以使用Pool的非阻塞方法和 Queue获取网页数据,代码如下:
from multiprocessing import Pool, Manager
import time
import requests
link_list = []
with open('alexa.txt', 'r') as file:
file_list = file.readlines()
for each in file_list:
link = each.replace('\n', '')
link_list.append(link)
def crawler(q, index):
process_id = 'Process--' + str(index)
while not q.empty():
url = q.get(timeout=2)
try:
r = requests.get(url, timeout=20)
print(process_id, q.qsize(), r.status_code, url)
except Exception as e:
print(process_id, q.qsize(), url, 'Error:', e)
if __name__ == '__main__':
manager = Manager()
workQueue = manager.Queue(1010)
start = time.time()
# 填充队列
for url in link_list:
workQueue.put(url)
pool = Pool(processes=3)
for i in range(4):
pool.apply_async(crawler,args=(workQueue, i))
print("Started processes")
pool.close()
pool.join()
end = time.time()
print('Pool+ Queue多进程爬虫的总时间为:', end - start)
print('Main process Ended!')
如果要将线程池Pool和 Queue结合,Queue的使用方式就需要改变,这里用到multiprocessingManger中的,使用 manager=Manager(和 workQueue=manager. Queue(1000)来创建队列。这个队列对象可以在父进程与子进程间通信。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。