当前位置:   article > 正文

python初学者爬虫教程(五)提升爬虫的速度_爬虫加速

爬虫加速

并发,并行,同步,异步

  • 并发指在一个时间段内发生若干事件的情况,各个任务时间段短,经常切换,所以感觉是“同时”进行。

  • 并行值在同一时刻发生若干事件的情况,真正的同时

  • 同步,各个任务不是独自运行的,任务之间有交替顺序,运行完一个任务后才运行另一个。

  • 异步,各个任务独立运行

多线程爬虫

多线程爬虫是以并发方式执行。即通过进程的快速切换来加快爬虫速度。

单线程例子

通过访问1000个网站,来统计时间

由于运行时间较长,不展示结果

txt文本数据在这里

import requests
import time

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for each in file_list:
        link = each.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

    start = time.time()
    for each in link_list:
        try:
            r = requests.get(each)
            print(r.status_code, each)
        except Exception as e:
            print(e)
    end = time.time()
    print("串行总时间为", end - start)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

python中的多线程

函数式

调用_thread模块中的start_new_thread创建新线程

import _thread
import time


# 为线程定义一个函数
def print_time(threadName, delay):
    count = 0
    while count < 3:
        time.sleep(delay)
        count += 1
        print(threadName, time.ctime())


if __name__ == '__main__':
    try:
        _thread.start_new_thread(print_time, ("Thread-1", 1))
        _thread.start_new_thread(print_time, ("Thread-2", 2))
        print("Main Finished")
    except:
        print("Error: unable to start thread")

    while 1:
        pass

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

结果:
在这里插入图片描述
可以看到,主线程先完成。然后两个新的线程还会继续运行。

_thread.start_new_thread(function, args[, kwargs])
启动一个线程,并返回其标识符。线程会用 args 作为参数(必须是元组)执行 function 函数。可选的 kwargs 参数使用字典来指定有名参数。当函数返回时,线程会静默退出,当函数由于未处理的异常而中止时,会打印一条堆栈追踪信息,然后该线程会退出(但其他线程还是会继续运行)。

类包装式

threading 模块基于_thread提供了更易用的高级多线程 API。

调用Threading库创建线程,从threading.Thread继承

  • start()
    开始线程活动。
    它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

  • run()
    代表线程活动的方法。

  • join(timeout=None)
    等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。
    当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 – 如果线程仍然存活,则 join() 超时。
    当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。
    一个线程可以被 join() 很多次。

  • is_alive()
    返回线程是否存活。

  • getName()

  • setName()

import threading
import time


class myThread(threading.Thread):
    def __init__(self, name, delay):
        threading.Thread.__init__(self)
        self.name = name
        self.delay = delay

    def run(self):
        print("Starting " + self.name)
        print_time(self.name, self.delay)
        print("Exiting " + self.name)


def print_time(threadName, delay):
    counter = 0
    while counter < 3:
        time.sleep(delay)
        print(threadName, time.ctime())
        counter += 1


threads = []

# 创建新线程
thread1 = myThread("Thread-1", 1)
thread2 = myThread("Thread-2", 2)

# 开启新线程
thread1.start()
thread2.start()

# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)

# 等待所有线程完成
for t in threads:
    t.join()

print("Exiting Main Thread")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

简单多线程爬虫

import requests
import time
import threading

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

start = time.time()


class myThread(threading.Thread):
    def __init__(self, name, link_range):
        threading.Thread.__init__(self)
        self.name = name
        self.link_range = link_range

    def run(self):
        print("Starting " + self.name)
        crawler(self.name, self.link_range)
        print("Exiting " + self.name)


def crawler(threadName, link_range):
    for i in range(link_range[0], link_range[1] + 1):
        try:
            r = requests.get(link_list[i])
            print(threadName, r.status_code, link_list[i])
        except Exception as e:
            print(e)


thread_list = []
link_range_list = [(0, 200), (201, 400), (401, 600), (601, 800), (801, 1000)]

# 创建新线程
for i in range(1, 6):
    thread = myThread("Thread-" + str(i), link_range_list[i - 1])
    thread.start()
    thread_list.append(thread)

# 等待所有线程完成
for thread in thread_list:
    thread.join()

end = time.time()
print('简单多线程爬虫的总时间为:', end - start)
print("Exiting Main Thread")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

使用thread_list = [] link_range_list = [(0, 200), (201, 400), (401, 600), (601, 800), (801, 1000)]将1000个网页分为5份,每一份200个网页

将这5份分给5个线程处理

问题:如果某个线程先完成200个网页的爬取,就会退出线程。此时5个线程就变成了4个线程,速度就有所下降,到最后只剩下一个线程时,就会变成单线程

使用Queue的多线程爬虫

import threading
import requests
import time
import queue as Queue

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

start = time.time()


class myThread(threading.Thread):
    def __init__(self, name, q):
        threading.Thread.__init__(self)
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        while True:
            try:
                crawler(self.name, self.q)
            except:
                break
        print("Exiting " + self.name)


def crawler(threadName, q):
    url = q.get(timeout=2)
    try:
        r = requests.get(url, timeout=20)
        print(q.qsize(), threadName, r.status_code, url)
    except Exception as e:
        print(threadName, 'Error: ', e)


threadList = ["Thread-1", "Thread-2", "Thread-3", "Thread-4", "Thread-5"]
workQueue = Queue.Queue(1000)
threads = []

# 创建新线程
for tName in threadList:
    thread = myThread(tName, workQueue)
    thread.start()
    threads.append(thread)

# 填充队列
for url in link_list:
    workQueue.put(url)

# 等待所有线程完成
for t in threads:
    t.join()

end = time.time()
print('简单多线程爬虫的总时间为:', end - start)
print("Exiting Main Thread")

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

多进程爬虫

使用multiprocessing(Process+Queue)的多进程爬虫

像线程一样管理进程

from multiprocessing import Process, Queue, cpu_count
import time
import requests

print(cpu_count())

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

start = time.time()


class MyProcess(Process):
    def __init__(self, q):
        Process.__init__(self)
        self.q = q

    def run(self):
        print("Starting ", self.pid)
        while not self.q.empty():
            crawler(self.q)
        print("Exiting ", self.pid)


def crawler(q):
    url = q.get(timeout=2)
    try:
        r = requests.get(url, timeout=20)
        print(q.qsize(), r.status_code, url)
    except Exception as e:
        print(q.qsize(), url, 'Error: ', e)


if __name__ == '__main__':
    ProcessNames = ["Process-1", "Process-2", "Process-3"]
    workQueue = Queue(1000)

    # 填充队列
    for url in link_list:
        workQueue.put(url)

    for i in range(0, 3):
        p = MyProcess(workQueue)
        p.daemon = True
        p.start()
        p.join()

    end = time.time()
    print('Process + Queue多进程爬虫的总时间为:', end - start)
    print('Main process Ended!')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

p.daemon = True表示当父进程结束后,子进程也会被终止

使用Pool+Queue的多进程爬虫

前面使用Process动态生成进程,如果进程太多不方便,使用Pool——进程池。

from multiprocessing import Pool, Manager
import time
import requests

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

start = time.time()


def crawler(q, index):
    Process_id = 'Process-' + str(index)
    while not q.empty():
        url = q.get(timeout=2)
        try:
            r = requests.get(url, timeout=20)
            print(Process_id, q.qsize(), r.status_code, url)
        except Exception as e:
            print(Process_id, q.qsize(), url, 'Error: ', e)


if __name__ == '__main__':
    manager = Manager()
    workQueue = manager.Queue(1000)

    # 填充队列
    for url in link_list:
        workQueue.put(url)

    pool = Pool(processes=3)
    for i in range(4):
        pool.apply_async(crawler, args=(workQueue, i))

    print("Started processes")
    pool.close()
    pool.join()

    end = time.time()
    print('Pool + Queue多进程爬虫的总时间为:', end - start)
    print('Main process Ended!')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

pool.apply_async为非阻塞方法,不需要等进程完就可以添加新进程
pool.apply为阻塞方法,等某个进程完了之后才会添加另一个进程

多协程爬虫

import gevent
from gevent.queue import Queue, Empty
import time
import requests

from gevent import monkey  # 把下面有可能有IO操作的单独做上标记

monkey.patch_all()  # 将IO转为异步执行的函数

link_list = []
with open('../alexa.txt', 'r') as file:
    file_list = file.readlines()
    for eachone in file_list:
        link = eachone.split('\t')[1]
        link = link.replace('\n', '')
        link_list.append(link)

start = time.time()


def crawler(index):
    Process_id = 'Process-' + str(index)
    while not workQueue.empty():
        url = workQueue.get(timeout=2)
        try:
            r = requests.get(url, timeout=20)
            print(Process_id, workQueue.qsize(), r.status_code, url)
        except Exception as e:
            print(Process_id, workQueue.qsize(), url, 'Error: ', e)


def boss():
    for url in link_list:
        workQueue.put_nowait(url)


if __name__ == '__main__':
    workQueue = Queue(1000)

    gevent.spawn(boss).join()
    jobs = []
    for i in range(10):
        jobs.append(gevent.spawn(crawler, i))
    gevent.joinall(jobs)

    end = time.time()
    print('gevent + Queue多协程爬虫的总时间为:', end - start)
    print('Main Ended!')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/189697
推荐阅读
相关标签
  

闽ICP备14008679号