当前位置:   article > 正文

multiprocessing的多进程爬虫_multiprocessing.pool queue

multiprocessing.pool queue

multiprocessing的多进程爬虫

multiprocessing对于习惯使用threading多线程的用户非常友好,因为它的理念是像线程一样管理进程,和
threading很像,而且对于多核CPU的利用率比 threading高得多。当进程数量大于CPU的内核数量时,等待运行的进程会等到其他进程运行完毕让出内核为止。因此,如果CPU是单核,就无法进行多进程并行。在使用多进程爬虫之前,我们需要先了解计算机CPU的核心数量。
这里用到了 multiprocessing:

from multiprocessing import cpu_count
print(cpu_count())
  • 1
  • 2

运行上述代码,得到的结果是6,也就
是本机的CPU核心数为6
在这里使用3个进程,代码如下:

from multiprocessing import Process, Queue
import time
import requests

link_list = []
with open('alexa.txt', 'r') as file:
    file_list = file.readlines()
    for each in file_list:
        link = each.replace('\n', '')
        link_list.append(link)


class MyProcess(Process):
    def __init__(self, q):
        Process.__init__(self)
        self.q = q

    def run(self):
        print("Starting ", self.pid)
        while not self.q.empty():
            self.crawler(self.q)
        print("Exiting", self.pid)

    def crawler(self, q):
        url = q.get(timeout=2)
        try:
            r = requests.get(url, timeout=20)
            print(q.qsize(), r.status_code, url)
        except Exception as e:
            print(q.qsize(), url, 'Error:', e)


if __name__ == '__main__':
    workQueue = Queue(1010)
    start = time.time()
    # 填充队列
    for url in link_list:
        print(url)
        workQueue.put(url)
    print(workQueue)
    for i in range(3):
        p = MyProcess(workQueue)
        p.daemon = True
        p.start()
        p.join()
    end = time.time()
    print('Process+ Queue多进程爬虫的总时间为:', end - start)
    print('Main process Ended!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

在上述代码中,使用 multiprocessing的方式基本和 thread库类似。首先使用:from multiprocessing import
Process, Queue导入 multiprocessing库。值得注意的是,在 thread多线程中用来控制队列的
Queue库, multiprocessing自带了。和thread类似的是,在读取链接列表后创建了 MyProcess这个类,变量是 workQueue队列。该类的其他部分基本与 thread多线程类似。我们继续使用循环来添加进程,与多线程不同的是,在多进程中设置了daemono p. daemon=true,在多进程中,每个进程都可以单独设置它的属
性,如果将 daemon设置为True,当父进程结束后,子进程就会自动被终止。

使用Pool+Queue的多进程爬虫

当被操作对象数目不大时,可以直接利用multiprocessingProcess中的动态成生多个进程,十几个还好,但如果是上百个、上千个进程,手动地限制进程数量就太过烦琐,此时可以使用Pool发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用。当有新的请求提交到pool中时,如果池还没有满,就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定的最大值,该请求就会继续等待,直到池中有进程结束才能够创建新的进程。
在使用Pool之前需要了解一下阻塞和非阻塞的概念。阻塞和非阻塞关注的是程序在等待调用结果(消息、返回值)时的状态。阻塞要等到回调结果出来,在有结果之前,当前进程会被挂起。非阻塞为添加进程后,不一定非要等到结果出来就可以添加其他进程运行。
首先,我们可以使用Pool的非阻塞方法和 Queue获取网页数据,代码如下:

from multiprocessing import Pool, Manager
import time
import requests

link_list = []
with open('alexa.txt', 'r') as file:
    file_list = file.readlines()
    for each in file_list:
        link = each.replace('\n', '')
        link_list.append(link)


def crawler(q, index):
    process_id = 'Process--' + str(index)
    while not q.empty():
        url = q.get(timeout=2)
        try:
            r = requests.get(url, timeout=20)
            print(process_id, q.qsize(), r.status_code, url)
        except Exception as e:
            print(process_id, q.qsize(), url, 'Error:', e)


if __name__ == '__main__':
    manager = Manager()
    workQueue = manager.Queue(1010)
    start = time.time()
    # 填充队列
    for url in link_list:
        workQueue.put(url)
    pool = Pool(processes=3)
    for i in range(4):
        pool.apply_async(crawler,args=(workQueue, i))
    print("Started processes")
    pool.close()
    pool.join()
    end = time.time()
    print('Pool+ Queue多进程爬虫的总时间为:', end - start)
    print('Main process Ended!')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

如果要将线程池Pool和 Queue结合,Queue的使用方式就需要改变,这里用到multiprocessingManger中的,使用 manager=Manager(和 workQueue=manager. Queue(1000)来创建队列。这个队列对象可以在父进程与子进程间通信。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号