当前位置:   article > 正文

python---协程与任务详解

python---协程与任务详解


前言

之前爬虫使用的是requests+多线程/多进程,后来随着前几天的深入了解,才发现,对于爬虫来说,真正的瓶颈并不是CPU的处理速度,而是对于网页抓取时候的往返时间,因为如果采用requests+多线程/多进程,他本身是阻塞式的编程,所以时间都花费在了等待网页结果的返回和对爬取到的数据的写入上面。而如果采用非阻塞编程,那么就没有这个困扰。这边首先要理解一下阻塞和非阻塞的区别。

(1)阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。

(2)对于非阻塞则不会挂起,直接执行接下去的程序,返回结果后再回来处理返回值。

其实爬虫的本质就是client(客户端)发请求,批量获取server(服务端)的响应数据,如果我们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会非常低。需要强调的是:对于单线程下串行N个任务,并不完全等同于低效,如果这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,之所以单线程下串行多个爬虫任务低效,是因为爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提高爬取性能呢?


一. 基本概念了解与学习

1.1 阻塞

  • 阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。

  • 常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。

1.2 非阻塞

  • 程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。

  • 非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。

  • 非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

1.3 同步

  • 不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。简言之,同步意味着有序。

1.4 异步

  • 为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。简言之,异步意味着无序。

1.5 多进程

  • 多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。

1.6 协程

  • 协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。

  • 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

  • 协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

  • 我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势。

二. 示例操作对比

2.1 同步调用

  • 同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低
import requests
import time


def get_page(url):
    print('下载 %s' % url)
    response = requests.get(url)
    if response.status_code == 200:
        return response.text


def parse_page(res):
    # print(res)
    print('解析 %s' % (len(res)))


def main():
    urls = ['https://www.baidu.com/', 'http://www.sina.com.cn/', 'https://www.python.org']
    for url in urls:
        res = get_page(url)  # 调用一个任务,就在原地等待任务结束拿到结果后才继续往后执行/单步进行
        parse_page(res)


if __name__ == "__main__":
    start = time.time()
    main()
    end = time.time()
    print("花费总时间为 %s" % (end-start))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 结果展示

在这里插入图片描述

  • 以上情况,如果遇到高并发需求时,需要花费时间成本过高,效率较低

  • 解决方法如下:

      1. 解决同步调用方案之多线程/多进程
        好处:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
        弊端:开启多进程或都线程的方式,我们是无法无限制地开启多进程或多线程的:在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
      1. 解决同步调用方案之线程/进程池
        好处:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。可以很好的降低系统开销。
        弊端:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

2.2 多进程

综上所述,我们可以使用多进程的案例来测试一下
使用 multiprocessing 库:

import requests
import time
import multiprocessing


def request(_):
    url = 'http://127.0.0.1:5000'
    # 添加异常处理机制来捕获和处理可能发生的异常
    try:
        print('Waiting for', url)
        # 设置超时时间
        result = requests.get(url, timeout=5).text
        print('获取到 response 地址来自', url, '页面返回结果:', result)
    except requests.exceptions.RequestException as e:
        print(f'请求失败: {e}')


if __name__ == '__main__':
    start = time.time()
    # 获取计算机上的CPU核心数并且输出
    cpu_count = multiprocessing.cpu_count()
    print('CPU核心数为:', cpu_count)
    # 创建进程池,进程池大小为cpu核心数
    pool = multiprocessing.Pool(cpu_count)
    # 异常处理机制
    try:
        # map方法是Pool类的一个成员方法,它类似于Python内置的map函数,但是它会使用进程池中的进程来并发地执行传入的函数。
        pool.map(request, range(12))
    finally:
        pool.close()  # 关闭进程池,不再接受新的任务
        pool.join()  # 等待所有进程执行完毕

    end = time.time()
    print('花费的总时间为:', end - start)

# 在 Windows 上,multiprocessing 需要这个保护块来确保子进程只执行主模块中的代码,而不是尝试重新导入整个模块

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

注意事项:在 Windows 上,multiprocessing 需要这个保护块来确保子进程只执行主模块中的代码,而不是尝试重新导入整个模块

结果如下:
在这里插入图片描述

2.3 异步IO

  • 异步IO:就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

  • 2.1中我们已经=了解到,如果所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈

  • 解决方法:可以用非阻塞接口来尝试解决这个问题。

  • 上述无论哪种方案都没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来。
    解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞,然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。

  • 实现方式:单线程+协程实现异步IO操作。

三. 异步协程

  • 在python3.4之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO【HTTP连接就是网络IO操作】),实现应用程序级别的切换(异步IO)。注意:asyncio只能发tcp级别的请求,不能发http协议。

asyncio 是干什么的?

  • 异步网络操作
  • 并发
  • 协程

概念理解:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
    coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
    task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
    future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。
    async关键字:async 定义一个协程;
    await 关键字:用来挂起阻塞方法的执行。

注意事项:在特殊函数内部不可以出现不支持异步模块相关的代码。(例:time,request)

3.1 定义协程

import asyncio


async def execute(x):
    print('Number:', x)

# 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作
coroutine = execute(1)
print('Coroutine协程对象:', coroutine)
print('execute已被调用但还没有执行')

# 通过asyncio.get_event_loop()获取当前的事件循环。
loop = asyncio.get_event_loop()
# 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。
loop.run_until_complete(coroutine)
print('协程已经运行完成')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

如上,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。

上文我们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。

在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象, 我们也可以显式地进行声明,如下所示:

import asyncio


# async def execute(x):
#     print('Number:', x)
#
#
# # 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作
# coroutine = execute(1)
# print('Coroutine协程对象:', coroutine)
# print('execute已被调用但还没有执行')
#
# # 通过asyncio.get_event_loop()获取当前的事件循环。
# loop = asyncio.get_event_loop()
# # 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。
# loop.run_until_complete(coroutine)
# print('协程已经运行完成')


# 将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明
async def execute(x):
    print('Number:', x)
    return x


coroutine = execute(1)
print('Coroutine协程对象:', coroutine)
print('execute已被调用但还没有执行')

loop = asyncio.get_event_loop()
# 使用loop.create_task(coroutine)创建一个任务。
# 这个任务是一个将在事件循环中调度的包装器,它封装了原始的协程对象
task = loop.create_task(coroutine)
# 输出任务对象状态/pending 状态
print('Task:', task)

loop.run_until_complete(task)
# 此时,任务对象的状态已经改变/finished
# 还可以看到其 result 变成了 1,也就是我们定义的 execute() 方法的返回结果
print('Task:', task)
print('协程已经运行完成')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在这里插入图片描述

另外,定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

# import asyncio
# async def execute(x):
#     print('Number:', x)
#
#
# # 当调用execute(1)时,并不会立即执行函数体。相反,它返回一个协程对象。协程对象表示一个待执行的异步操作
# coroutine = execute(1)
# print('Coroutine协程对象:', coroutine)
# print('execute已被调用但还没有执行')
#
# # 通过asyncio.get_event_loop()获取当前的事件循环。
# loop = asyncio.get_event_loop()
# # 使用loop.run_until_complete(coroutine)来运行之前创建的协程对象,直到它完成。这实际上会启动异步操作并等待其完成。
# loop.run_until_complete(coroutine)
# print('协程已经运行完成')


# 将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明
# async def execute(x):
#     print('Number:', x)
#     return x
#
#
# coroutine = execute(1)
# print('Coroutine协程对象:', coroutine)
# print('execute已被调用但还没有执行')
#
# loop = asyncio.get_event_loop()
# # 使用loop.create_task(coroutine)创建一个任务。
# # 这个任务是一个将在事件循环中调度的包装器,它封装了原始的协程对象
# task = loop.create_task(coroutine)
# # 输出任务对象状态/pending 状态
# print('Task:', task)
#
# loop.run_until_complete(task)
# # 此时,任务对象的状态已经改变/finished
# # 还可以看到其 result 变成了 1,也就是我们定义的 execute() 方法的返回结果
# print('Task:', task)
# print('协程已经运行完成')


# 另外,定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:
import asyncio


async def execute(x):
    print('Number:', x)
    return x


coroutine = execute(1)
print('Coroutine协程对象:', coroutine)
print('execute已被调用但还没有执行')

task = asyncio.ensure_future(coroutine)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('协程已经运行完成')

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

在这里插入图片描述

3.2 多任务协程

上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行

# import asyncio
# import requests
#
#
# async def request():
#     url = 'https://www.baidu.com'
#     status = requests.get(url).status_code
#     return status
#
#
# tasks = [asyncio.ensure_future(request()) for _ in range(5)]
# print('Tasks:', tasks)
#
# # 获取事件循环
# loop = asyncio.get_event_loop()
# # 使用asyncio.wait(tasks)来等待所有任务完成
# loop.run_until_complete(asyncio.wait(tasks))
# for task in tasks:
#     print('Task Result:', task.result())

import asyncio
import aiohttp


async def request(session, url):
    async with session.get(url) as response:
        return response.status


async def main():
    url = 'https://www.baidu.com'
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(request(session, url)) for _ in range(5)]
        print('Tasks:', tasks)
        await asyncio.wait(tasks)
        for task in tasks:
            print('Task Result:', task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来

在这里插入图片描述

可以看到五个任务被顺次执行了,并得到了运行结果

注意事项:获取事件循环并使用asyncio.wait(tasks)来等待所有任务完成。尽管使用了asyncio来管理任务,但由于request函数的同步性质,这些任务实际上会一个接一个地执行,而不是并发执行。(稍安勿躁,后面会说明如何进行并发执行)

3.3 协程实现

上面的案例只是为后面的使用作铺垫,接下来我们正式来看下协程在解决 IO 密集型任务上有怎样的优势吧!

为了表现出协程的优势,我们需要先创建一个合适的实验环境,最好的方法就是模拟一个需要等待一定时间才可以获取返回结果的网页,上面的代码中使用了百度,但百度的响应太快了,而且响应速度也会受本机网速影响,所以最好的方式是自己在本地模拟一个慢速服务器,这里我们选用 Flask。
编写一个简单的服务器:

# 这里我们定义了一个 Flask 服务,主入口是 index() 方法,方法里面先调用了 sleep() 方法休眠 3 秒,然后接着再返回结果,也就是说,每次请求这个接口至少要耗时 3 秒,这样我们就模拟了一个慢速的服务接口。

from flask import Flask
import time

app = Flask(__name__)


@app.route('/')
def index():
    time.sleep(3)
    return 'Hello!'


if __name__ == '__main__':
    app.run(threaded=True)
# 注意这里服务启动的时候,run() 方法加了一个参数 threaded,这表明 Flask 启动了多线程模式,不然默认是只有一个线程的。如果不开启多线程模式,同一时刻遇到多个请求的时候,只能顺次处理,这样即使我们使用协程异步请求了这个服务,也只能一个一个排队等待,瓶颈就会出现在服务端。所以,多线程模式是有必要打开的。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

启动之后,Flask 应该默认会在 127.0.0.1:5000 上运行,运行之后控制台输出结果如下:
在这里插入图片描述
接下来我们再重新使用上面的方法请求一遍:

import asyncio
import requests
import time

start = time.time()


async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = requests.get(url)
    print('获取到 response 地址来自', url, '页面返回结果:', response.text)


tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('程序所花费总时间:', end - start)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这里我们还是创建了五个 task,然后将 task 列表传给 wait() 方法并注册到时间循环中执行。
运行结果如下:
在这里插入图片描述

可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 15 秒多,平均一个请求耗时 3 秒,说好的异步处理呢?

其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。

要实现异步,接下来我们再了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

所以,我们可能会将代码中的 request() 方法改成如下的样子:

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await requests.get(url)
    print('Get response from', url, 'Result:', response.text)
  • 1
  • 2
  • 3
  • 4
  • 5

会出现以下报错

在这里插入图片描述
!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

3.4 使用 aiohttp

aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。

import aiohttp
import asyncio
import time

start = time.time()


# 这个函数是一个异步函数,它使用aiohttp.ClientSession来发送一个GET请求到指定的URL
async def get_res(url):
    # 使用async with语句来确保会话在函数结束时被正确关闭
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        result = await response.text()
        return result


# 异步函数,调用了get_res函数进行发起请求
async def request():
    url = 'http://127.0.0.1:5000'
    print('url请求等待三秒:', url)
    result = await get_res(url)
    print('获取到 response 地址来自', url, '页面返回结果:', result)


# 列表推导式,创建五个request协程任务
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
# 获取事件循环
loop = asyncio.get_event_loop()
# 运行任务
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('协程花费总时间为:', end - start)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

结果如下

在这里插入图片描述

成功了!我们发现这次请求的耗时由 15 秒变成了 3 秒,耗时直接变成了原来的 1/5。

代码里面我们使用了 await,后面跟了 get() 方法,在执行这五个协程的时候,如果遇到了 await,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。

开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第五个 task 的 session.get() 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,那咋办?只好等待了。3 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,3 秒!

怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。

有人就会说了,既然这样的话,在上面的例子中,在发出网络请求后,既然接下来的 3 秒都是在等待的,在 3 秒之内,CPU 可以处理的 task 数量远不止这些,那么岂不是我们放 10 个、20 个、50 个、100 个、1000 个 task 一起执行,最后得到所有结果的耗时不都是 3 秒左右吗?因为这几个任务被挂起后都是一起等待的。

理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。

这里我i要插一嘴了,理论上是这样,但实际情况在做爬虫时候,会有很多公司会在这个方面进行反爬策略:就是小批量数据采集,可以允许,但是实现高并发的大量数据采集,会对其有所限制,当然,具体情况具体分析

我们这里将 task 数量设置成 100,再试一下,结果如下:
在这里插入图片描述

最后运行时间也是在 3 秒左右,当然多出来的时间就是 IO 时延了。

可见,使用了异步协程之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。

3.5 与多进程结合

在最新的 PyCon 2018 上,来自 Facebook 的 John Reese 介绍了 asyncio 和 multiprocessing 各自的特点,并开发了一个新的库,叫做 aiomultiprocess。需要 Python 3.6 及更高版本才可使用。

使用这个库,我们可以将上面的例子改写如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
 
start = time.time()
 
async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result
 
async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
 
end = time.time()
print('Cost time:', end - start)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

总结

参考文档:
官方中文文档
asyncio — 异步 I/O
协程

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/352938
推荐阅读
相关标签
  

闽ICP备14008679号