赞
踩
之前爬虫使用的是requests+多线程/多进程,后来随着前几天的深入了解,才发现,对于爬虫来说,真正的瓶颈并不是CPU的处理速度,而是对于网页抓取时候的往返时间,因为如果采用requests+多线程/多进程,他本身是阻塞式的编程,所以时间都花费在了等待网页结果的返回和对爬取到的数据的写入上面。而如果采用非阻塞编程,那么就没有这个困扰。这边首先要理解一下阻塞和非阻塞的区别。
(1)阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。
(2)对于非阻塞则不会挂起,直接执行接下去的程序,返回结果后再回来处理返回值。
其实爬虫的本质就是client(客户端)发请求,批量获取server(服务端)的响应数据,如果我们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会非常低。需要强调的是:对于单线程下串行N个任务,并不完全等同于低效,如果这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,之所以单线程下串行多个爬虫任务低效,是因为爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提高爬取性能呢?
阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。
常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。
程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。
非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。
非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。
协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。
我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势。
import requests import time def get_page(url): print('下载 %s' % url) response = requests.get(url) if response.status_code == 200: return response.text def parse_page(res): # print(res) print('解析 %s' % (len(res))) def main(): urls = ['https://www.baidu.com/', 'http://www.sina.com.cn/', 'https://www.python.org'] for url in urls: res = get_page(url) # 调用一个任务,就在原地等待任务结束拿到结果后才继续往后执行/单步进行 parse_page(res) if __name__ == "__main__": start = time.time() main() end = time.time() print("花费总时间为 %s" % (end-start))
以上情况,如果遇到高并发需求时,需要花费时间成本过高,效率较低
解决方法如下:
综上所述,我们可以使用多进程的案例来测试一下
使用 multiprocessing 库:
import requests import time import multiprocessing def request(_): url = 'http://127.0.0.1:5000' # 添加异常处理机制来捕获和处理可能发生的异常 try: print('Waiting for', url) # 设置超时时间 result = requests.get(url, timeout=5).text print('获取到 response 地址来自', url, '页面返回结果:', result) except requests.exceptions.RequestException as e: print(f'请求失败: {e}') if __name__ == '__main__': start = time.time() # 获取计算机上的CPU核心数并且输出 cpu_count = multiprocessing.cpu_count() print('CPU核心数为:', cpu_count) # 创建进程池,进程池大小为cpu核心数 pool = multiprocessing.Pool(cpu_count) # 异常处理机制 try: # map方法是Pool类的一个成员方法,它类似于Python内置的map函数,但是它会使用进程池中的进程来并发地执行传入的函数。 pool.map(request, range(12)) finally: pool.close() # 关闭进程池,不再接受新的任务 pool.join() # 等待所有进程执行完毕 end = time.time() print('花费的总时间为:', end - start) # 在 Windows 上,multiprocessing 需要这个保护块来确保子进程只执行主模块中的代码,而不是尝试重新导入整个模块
注意事项:在 Windows 上,multiprocessing 需要这个保护块来确保子进程只执行主模块中的代码,而不是尝试重新导入整个模块
结果如下:
异步IO:就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。
2.1中我们已经=了解到,如果所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈
解决方法:可以用非阻塞接口来尝试解决这个问题。
上述无论哪种方案都没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。
解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞,然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。
实现方式:单线程+协程实现异步IO操作。
asyncio 是干什么的?
概念理解:
注意事项:在特殊函数内部不可以出现不支持异步模块相关的代码。(例:time,request)
import asyncio async def execute(x): print('Number:', x) # 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作 coroutine = execute(1) print('Coroutine协程对象:', coroutine) print('execute已被调用但还没有执行') # 通过asyncio.get_event_loop()获取当前的事件循环。 loop = asyncio.get_event_loop() # 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。 loop.run_until_complete(coroutine) print('协程已经运行完成')
如上,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。
上文我们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。
在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象, 我们也可以显式地进行声明,如下所示:
import asyncio # async def execute(x): # print('Number:', x) # # # # 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作 # coroutine = execute(1) # print('Coroutine协程对象:', coroutine) # print('execute已被调用但还没有执行') # # # 通过asyncio.get_event_loop()获取当前的事件循环。 # loop = asyncio.get_event_loop() # # 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。 # loop.run_until_complete(coroutine) # print('协程已经运行完成') # 将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明 async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine协程对象:', coroutine) print('execute已被调用但还没有执行') loop = asyncio.get_event_loop() # 使用loop.create_task(coroutine)创建一个任务。 # 这个任务是一个将在事件循环中调度的包装器,它封装了原始的协程对象 task = loop.create_task(coroutine) # 输出任务对象状态/pending 状态 print('Task:', task) loop.run_until_complete(task) # 此时,任务对象的状态已经改变/finished # 还可以看到其 result 变成了 1,也就是我们定义的 execute() 方法的返回结果 print('Task:', task) print('协程已经运行完成')
另外,定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:
# import asyncio # async def execute(x): # print('Number:', x) # # # # 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作 # coroutine = execute(1) # print('Coroutine协程对象:', coroutine) # print('execute已被调用但还没有执行') # # # 通过asyncio.get_event_loop()获取当前的事件循环。 # loop = asyncio.get_event_loop() # # 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。 # loop.run_until_complete(coroutine) # print('协程已经运行完成') # 将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明 # async def execute(x): # print('Number:', x) # return x # # # coroutine = execute(1) # print('Coroutine协程对象:', coroutine) # print('execute已被调用但还没有执行') # # loop = asyncio.get_event_loop() # # 使用loop.create_task(coroutine)创建一个任务。 # # 这个任务是一个将在事件循环中调度的包装器,它封装了原始的协程对象 # task = loop.create_task(coroutine) # # 输出任务对象状态/pending 状态 # print('Task:', task) # # loop.run_until_complete(task) # # 此时,任务对象的状态已经改变/finished # # 还可以看到其 result 变成了 1,也就是我们定义的 execute() 方法的返回结果 # print('Task:', task) # print('协程已经运行完成') # 另外,定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下: import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine协程对象:', coroutine) print('execute已被调用但还没有执行') task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task) print('协程已经运行完成')
上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行
# import asyncio # import requests # # # async def request(): # url = 'https://www.baidu.com' # status = requests.get(url).status_code # return status # # # tasks = [asyncio.ensure_future(request()) for _ in range(5)] # print('Tasks:', tasks) # # # 获取事件循环 # loop = asyncio.get_event_loop() # # 使用asyncio.wait(tasks)来等待所有任务完成 # loop.run_until_complete(asyncio.wait(tasks)) # for task in tasks: # print('Task Result:', task.result()) import asyncio import aiohttp async def request(session, url): async with session.get(url) as response: return response.status async def main(): url = 'https://www.baidu.com' async with aiohttp.ClientSession() as session: tasks = [asyncio.create_task(request(session, url)) for _ in range(5)] print('Tasks:', tasks) await asyncio.wait(tasks) for task in tasks: print('Task Result:', task.result()) loop = asyncio.get_event_loop() loop.run_until_complete(main())
这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来
可以看到五个任务被顺次执行了,并得到了运行结果
注意事项:获取事件循环并使用asyncio.wait(tasks)来等待所有任务完成。尽管使用了asyncio来管理任务,但由于request函数的同步性质,这些任务实际上会一个接一个地执行,而不是并发执行。(稍安勿躁,后面会说明如何进行并发执行)
上面的案例只是为后面的使用作铺垫,接下来我们正式来看下协程在解决 IO 密集型任务上有怎样的优势吧!
为了表现出协程的优势,我们需要先创建一个合适的实验环境,最好的方法就是模拟一个需要等待一定时间才可以获取返回结果的网页,上面的代码中使用了百度,但百度的响应太快了,而且响应速度也会受本机网速影响,所以最好的方式是自己在本地模拟一个慢速服务器,这里我们选用 Flask。
编写一个简单的服务器:
# 这里我们定义了一个 Flask 服务,主入口是 index() 方法,方法里面先调用了 sleep() 方法休眠 3 秒,然后接着再返回结果,也就是说,每次请求这个接口至少要耗时 3 秒,这样我们就模拟了一个慢速的服务接口。 from flask import Flask import time app = Flask(__name__) @app.route('/') def index(): time.sleep(3) return 'Hello!' if __name__ == '__main__': app.run(threaded=True) # 注意这里服务启动的时候,run() 方法加了一个参数 threaded,这表明 Flask 启动了多线程模式,不然默认是只有一个线程的。如果不开启多线程模式,同一时刻遇到多个请求的时候,只能顺次处理,这样即使我们使用协程异步请求了这个服务,也只能一个一个排队等待,瓶颈就会出现在服务端。所以,多线程模式是有必要打开的。
启动之后,Flask 应该默认会在 127.0.0.1:5000 上运行,运行之后控制台输出结果如下:
接下来我们再重新使用上面的方法请求一遍:
import asyncio import requests import time start = time.time() async def request(): url = 'http://127.0.0.1:5000' print('Waiting for', url) response = requests.get(url) print('获取到 response 地址来自', url, '页面返回结果:', response.text) tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('程序所花费总时间:', end - start)
在这里我们还是创建了五个 task,然后将 task 列表传给 wait() 方法并注册到时间循环中执行。
运行结果如下:
可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 15 秒多,平均一个请求耗时 3 秒,说好的异步处理呢?
其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。
要实现异步,接下来我们再了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
所以,我们可能会将代码中的 request() 方法改成如下的样子:
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)
会出现以下报错
!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。
aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。
import aiohttp import asyncio import time start = time.time() # 这个函数是一个异步函数,它使用aiohttp.ClientSession来发送一个GET请求到指定的URL async def get_res(url): # 使用async with语句来确保会话在函数结束时被正确关闭 async with aiohttp.ClientSession() as session: response = await session.get(url) result = await response.text() return result # 异步函数,调用了get_res函数进行发起请求 async def request(): url = 'http://127.0.0.1:5000' print('url请求等待三秒:', url) result = await get_res(url) print('获取到 response 地址来自', url, '页面返回结果:', result) # 列表推导式,创建五个request协程任务 tasks = [asyncio.ensure_future(request()) for _ in range(5)] # 获取事件循环 loop = asyncio.get_event_loop() # 运行任务 loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('协程花费总时间为:', end - start)
结果如下
成功了!我们发现这次请求的耗时由 15 秒变成了 3 秒,耗时直接变成了原来的 1/5。
代码里面我们使用了 await,后面跟了 get() 方法,在执行这五个协程的时候,如果遇到了 await,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。
开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第五个 task 的 session.get() 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,那咋办?只好等待了。3 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,3 秒!
怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。
有人就会说了,既然这样的话,在上面的例子中,在发出网络请求后,既然接下来的 3 秒都是在等待的,在 3 秒之内,CPU 可以处理的 task 数量远不止这些,那么岂不是我们放 10 个、20 个、50 个、100 个、1000 个 task 一起执行,最后得到所有结果的耗时不都是 3 秒左右吗?因为这几个任务被挂起后都是一起等待的。
理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。
这里我i要插一嘴了,理论上是这样,但实际情况在做爬虫时候,会有很多公司会在这个方面进行反爬策略:就是小批量数据采集,可以允许,但是实现高并发的大量数据采集,会对其有所限制,当然,具体情况具体分析
我们这里将 task 数量设置成 100,再试一下,结果如下:
最后运行时间也是在 3 秒左右,当然多出来的时间就是 IO 时延了。
可见,使用了异步协程之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。
在最新的 PyCon 2018 上,来自 Facebook 的 John Reese 介绍了 asyncio 和 multiprocessing 各自的特点,并开发了一个新的库,叫做 aiomultiprocess。需要 Python 3.6 及更高版本才可使用。
使用这个库,我们可以将上面的例子改写如下:
import asyncio import aiohttp import time from aiomultiprocess import Pool start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) result = await response.text() session.close() return result async def request(): url = 'http://127.0.0.1:5000' urls = [url for _ in range(100)] async with Pool() as pool: result = await pool.map(get, urls) return result coroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task) end = time.time() print('Cost time:', end - start)
参考文档:
官方中文文档
asyncio — 异步 I/O
协程
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。