赞
踩
其实爬虫的本质就是client发请求批量获取server的响应数据,如果我们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会非常低。需要强调的是:对于单线程下串行N个任务,并不完全等同于低效,如果这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,之所以单线程下串行多个爬虫任务低效,是因为爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提高爬取性能呢?
同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低下
import requests
def parse_page(res):
print('解析 %s' %(len(res)))
def get_page(url):
print('下载 %s' %url)
response=requests.get(url)
if response.status_code == 200:
return response.text
urls = ['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
res=get_page(url) # 调用一个任务,就在原地等待任务结束拿到结果后才继续往后执行
parse_page(res)
解决方案
上述无论哪种解决方案其实没有解决一个性能相关的问题:IO阻塞,无论是多进程还是多线程,在遇到IO阻塞时都会被操作系统强行剥夺走CPU的执行权限,程序的执行效率因此就降低了下来
解决这一问题的关键在于,我们自己从应用程序级别检测IO阻塞然后切换到我们自己程序的其他任务执行,这样把我们程序的IO降到最低,我们的程序处于就绪态就会增多,以此来迷惑操作系统,操作系统便以为我们的程序是IO比较少的程序,从而会尽可能多的分配CPU给我们,这样也就达到了提升程序执行效率的目的。
在python3.4之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO【HTTP连接就是网络IO操作】),实现应用程序级别的切换(异步IO)。注意:asyncio只能发tcp级别的请求,不能发http协议。
异步IO:所谓「异步 IO」,就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。
我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势
接下来让我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便
Python 中使用协程最常用的库莫过于 asyncio,所以本文会以 asyncio 为基础来介绍协程的使用
首先我们需要了解下面几个概念:
pip insatll greenlet
from greenlet import greenlet def func1(): print(1) # 第一步 gr2.switch() # 第二步切换到 func2 函数 print(2) # 第六步 gr2.switch() # 第七步切换到 func2,从上次执行位置执行 def func2(): print(3) # 第三步 gr1.switch() # 第五步,切换到 func1 上次执行的位置 print(4) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() # 执行 func1
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
在 python3.5 之后的版本支持
import asyncio @asyncio.coroutine def func1(): print(1) # 网络 IO 请求 yield from asyncio.sleep(2) # 遇到 IO 耗时操作,自动切换 print(2) @asyncio.coroutine def func2(): print(3) # 网络 IO 请求 yield from asyncio.sleep(2) # 遇到 IO 耗时操作,自动切换 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
遇到 IO 阻塞自动切换
import asyncio async def func1(): print(1) # 网络 IO 请求 await asyncio.sleep(2) # 遇到 IO 耗时操作,自动切换 print(2) async def func2(): print(3) # 网络 IO 请求 await asyncio.sleep(2) # 遇到 IO 耗时操作,自动切换 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
在一个线程中如果遇到 IO 等待时间,线程不会等待,而是利用空闲时间再去干点其他事情
案例:
普通方式:同步方式
下载三张图片
from requests import get from time import time def download_img(url): name = url.split("/")[-1] print(f"开始下载{name}") resp = get(url=url) with open(f"./img/{name}", "wb") as f: f.write(resp.content) print("下载完成") if __name__ == '__main__': sta = time() url = [ "http://kr.shanghai-jiuxin.com/file/mm/20211129/qgenlhwzyvs.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/c0b455b1f25dec71d995550b2e9f898e.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/8e3674af90cba4a3fcfcfce30ab9e5b3.jpg", ] for i in url: download_img(i) print(f"总用时:{(time() - sta):.2}")
协程方式:异步方式
下载三张图片
import asyncio, aiohttp, aiofile from time import time async def fetch(session, url): name = url.split("/")[-1] print(f"开始下载{name}") async with session.get(url, verify_ssl=False) as resp: resp = await resp.content.read() async with aiofile.async_open(f"./img/{name}", "wb") as f: await f.write(resp) print("下载完成") async def main(): async with aiohttp.ClientSession() as session: urls = [ "http://kr.shanghai-jiuxin.com/file/mm/20211129/qgenlhwzyvs.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/c0b455b1f25dec71d995550b2e9f898e.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/8e3674af90cba4a3fcfcfce30ab9e5b3.jpg", ] tasks = [asyncio.create_task(fetch(session, url)) for url in urls] await asyncio.wait(tasks) if __name__ == '__main__': sta = time() asyncio.run(main()) print(f"总用时:{(time() - sta):.2}")
理解为一个死循环,去检测并执行代码
任务列表 = [任务1, 任务2, 任务3, ···]
while True:
可执行任务列表, 已完成任务列表 = 去任务列表中检查所有的任务,将 “可执行” 和 “已完成” 的任务返回
for 就绪任务 in 已准备就行的任务列表:
执行已就绪的任务
for 已完成的任务 in 已完成的任务列表:
任务列表中移除已完成的任务
如果任务列表中的任务都已完成,则终止循环
import asyncio
# 去生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务添加到任务列表
loop.run_until_complete(tasks)
协程函数,定义函数时候,async def 函数名()
协程对象,执行协程函数,得到对象
async def func(): # 协程函数
pass
result = func() # 其为协程对象,函数内部代码不会执行
执行协程函数
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio
async def func():
print("你好")
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)
asyncio.run(result) # 简便写法
事件循环简便的写法
asyncio.run(result) # python 3.7 以后才有
- 1
await + 可等待的对象(协程对象,Future,Task对象 -> IO 等待)
实例1:
import asyncio
async def func():
print("你好")
await asyncio.sleep(2)
print("Hello")
asyncio.run(func()) # 传入协程对象
实例2:
import asyncio
async def func():
print("你好")
await asyncio.sleep(2)
print("Hello")
return 2
async def func2():
print("函数开始执行")
resp = await func() # 添加协程对象
print(resp)
asyncio.run(func2()) # 传入协程对象
实例3:
import asyncio async def func(): print("你好") await asyncio.sleep(2) print("Hello") return 2 async def func2(): print("函数开始执行") resp = await func() # 添加协程对象 print("1:", resp) resp = await func() print("2:", resp) asyncio.run(func2()) # 传入协程对象
await 就是等待对象的值得到结果之后再继续执行下去
实例4
import asyncio, time async def func(): print("你好") await asyncio.sleep(2) print("Hello") return 2 async def func2(): print("函数开始执行") resp = await func() # 添加协程对象 print(f"{time.ctime()}:", resp) async def main(): tasks = [asyncio.create_task(func2()) for i in range(3)] await asyncio.wait(tasks) asyncio.run(main()) # 传入协程对象
在事件循环中添加多个任务的
asyncio.create_task(协程对象)
的方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行,除了使用asyncio.create_task(协程对象)
函数以外,还可以使用低层级的loop.create_task()
或ensure_future()
函数,不建议手动实例化 Task 对象asyncio.create_task()
是 python3.7 中加入的示例1:
import asyncio async def func(): print("你好") await asyncio.sleep(2) print("Hello") return 2 async def main(): # 创建两个 Task 对象,将当前执行 func 函数任务添加到事件循环 tasks = [asyncio.create_task(func(), name=f"n{i}") for i in range(3)] # name 参数设置 Task 对象名称 # 当某协程遇到 IO 操作时,会自动化切换到其他任务 # 此处的 await 是等待相对应的协程全部都执行完毕并获取结果 done, pending = await asyncio.wait(tasks, timeout=None) print(list(done)[0].result()) # 获取返回值 asyncio.run(main()) # 传入协程对象
示例2:
import asyncio
async def func():
print("你好")
await asyncio.sleep(2)
print("Hello")
return 2
# 报错,没有添加事件循环
# tasks = [asyncio.create_task(func(), name=f"n{i}") for i in range(3)]
tasks = [func() for i in range(3)] # 只能通过这样完成
done, pending = asyncio.run(asyncio.wait(tasks))
print(done)
Task 继承 Future ,Task 对象内部 await 结果的处理基于 Future 对象来的
示例1:
async def main():
# 获取当前事件
loop = asyncio.get_running_loop()
# 创建一个任务,(Future对象),这个任务什么也不干
fut = loop.create_future()
# 等待任务最终结果(Future对象),没有结果会一直等待下去
await fut
asyncio.run(main())
示例2:
import asyncio async def set_after(fut): await asyncio.sleep(2) fut.set_result("666") async def main(): # 获取当前事件 loop = asyncio.get_running_loop() # 创建一个任务,(Future对象),这个任务什么也不干 fut = loop.create_future() # 创建一个任务(Task对象),绑定了 set_after 函数,函数内部在 2s 之后,会给 fut 赋值 # 即手动设置 Future任务的最终结果,那么 fut 就结束了 await loop.create_task(set_after(fut)) # 等待 Future对象获取最终结果,否则一直等待下去 data = await fut print(data) asyncio.run(main())
使用线程池和进程池实现异步操作时用到的对象
from concurrent.futures import Future, ThreadPoolExecutor, ProcessPoolExecutor from time import sleep def func(val): sleep(1) return val pool = ThreadPoolExecutor(max_workers=5) for i in range(10): fut = pool.submit(func, i) print(fut.result()) # 利用future对象,得到返回值 pool.shutdown()
以后写代码可能会存在交叉情况,例如:crm项目 80% 都是基于协程同步编程 + MySQL(不支持)【线程、进程做异步编程】
import time, asyncio, concurrent.futures def func1(): # 某个耗时操作 time.sleep(2) return "11" async def main(): loop = asyncio.get_running_loop() """ 1. Run in the default loop's executor: 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数,并返回一个 concurrent.futures.Future 对象 第二步:调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Futures 对象 因为 concurrent.futures.Future 对象不支持 await 语法,所以包装为 asyncio.Futures 对象才能使用 """ fut = loop.run_in_executor(None, func1) result = await fut print("default thread pool", result) """ 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, func1 ) print("custom thread pool", result) 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, func1 ) print("custom process pool", result) """ asyncio.run(main())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
异步和同步结合使用
import asyncio, requests async def download_img(url): # 发送网络请求,下载图片 print("开始下载:", url) loop = asyncio.get_event_loop() # requests模块默认不支持异步操作,所以使用线程池配合实现 fut = loop.run_in_executor(None, requests.get, url) resp = await fut print("下载完成") # 图片保存到本地 name = url.split("/")[-1] with open(name, "wb") as f: f.write(resp.content) if __name__ == "__main__": urls = [ "http://kr.shanghai-jiuxin.com/file/mm/20211129/qgenlhwzyvs.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/c0b455b1f25dec71d995550b2e9f898e.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/8e3674af90cba4a3fcfcfce30ab9e5b3.jpg", ] tasks = [download_img(url) for url in urls] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
什么是异步迭代器?
异步迭代器:实现了__aiter__()和__anext__()方法的对象,必须返回一个awaitable对象。async_for支持处理异步迭代器的对象
__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常,这个改动由PEP 492引入
异步可迭代对象:可在async_for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器). 这个改动由PEP 492引入
import asyncio class Iterator: """自定义异步迭代器(异步迭代器对象)""" def __init__(self): self.count = 0 async def readlines(self): # await asyncio.sleep(1) self.count += 1 if self.count == 100: return None return self.count def __aiter__(self): return self async def __anext__(self): val = await self.self.readlines() if val == None: raise StopAsyncIteration return val async def func(): obj = Iterator() async for i in obj: print(i) asyncio.run(func())
不能直接写在普通方法或者暴露在外面。必须写在协程函数,任意协程函数均可
此种对象通过定义__aenter__()和__aexit__()方法来对async_with 语句中的环境进行控制,由PEP 492引入
import asyncio class AsyncContextManager: def __init__(self): # 打开数据库 self.conn = conn async def do_something(self): # 异步操作数据库 return 666 async def __aenter__(self): # 异步链接数据库 self.conn = await asyncio.sleep(1) return self async def __aexit__(self, exc_type, exc, tb): # 异步关闭数据库连接 await asyncio.sleep(1) async def func(): async with AsyncContextManager() as f: result = f.do_something() print(result) asyncio.run(func())
其是 asyncio 事件循环的替换方案,事件循环效率 > 默认的 asyncio 的循环事件
安装:
pip install uvloop
语法:
import asyncio, uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # 重新设置循环事件
# 编写 asyncio 的代码,与之前的代码一致
# 内部的事件循环自动会变为 uvloop
asyncio.run()
在使用 python 代码操作 redis 时,链接 / 操作 / 断开,都是网络 IO
安装模块
pip install aioredis
实例:
import asyncio, aioredis
async def main():
redis = await aioredis.from_url('redis://192.168.45.129:6379')
await redis.set("my-key", "value")
value = await redis.get("my-key")
print(value)
asyncio.run(main())
安装模块
pip install aiomysql
实例:
import aiomysql, asyncio async def execute(db_config): print("连接成功") conn = await aiomysql.connect(**db_config) # 创建游标对象 cur = await conn.cursor() # 网络 IO 请求:执行 SQL await cur.execute("SELECT HOST, User, FROM user") # 获取 SQL 结果 ret = await cur.fetchall() print(ret) conn.close() db_config = { "host": "192.168.45.129", "port": 3306, "user": "admin", "password": "qwe123", "db": "user", # 指定操作的数据库 "charset": "utf8" } asyncio.run(execute(db_config))
环境配置
pip install fastapi
pip install uvicorn # 支持异步,内部基于 uvloop
实例:
import uvicorn, asyncio, aioredis from aioredis import Redis from fastapi import FastAPI app = FastAPI() REDIS_POOL = aioredis.ConnectionsPool("redis://47.193.14.198:6379", password="root123", minsize=1, maxsize=10) @app.get("/") def index(): """普通窗口""" # 某个 IO 操作 10s return {"message": "Hello World"} @app.get("/red") def red(): "异步操作接口" print("接收请求") await asyncio.sleep(3) # 连接池获取一个连接 conn = await REDIS_POOL.acquire() redis = Redis(conn) # 设置值 await redis.hmset_dict("car", key1=1, key2=2, key3=3) # 读取值 ret= await redis.hgetall("car", encoding="utf-8") print(ret) # 连接归还连接池 REDIS_POOL.release(conn) return result if __name__ == "__main__": # luffy 为 pythton 文件名称 uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
import asyncio, aiohttp, aiofile from time import time async def fetch(session, url): name = url.split("/")[-1] print(f"开始下载{name}") async with session.get(url, verify_ssl=False) as resp: resp = await resp.content.read() async with aiofile.async_open(f"./img/{name}", "wb") as f: await f.write(resp) print("下载完成") async def main(): async with aiohttp.ClientSession() as session: urls = [ "http://kr.shanghai-jiuxin.com/file/mm/20211129/qgenlhwzyvs.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/c0b455b1f25dec71d995550b2e9f898e.jpg", "http://kr.shanghai-jiuxin.com/file/2020/0223/8e3674af90cba4a3fcfcfce30ab9e5b3.jpg", ] tasks = [asyncio.create_task(fetch(session, url)) for url in urls] done, pending = await asyncio.wait(tasks) if __name__ == '__main__': sta = time() asyncio.run(main()) print(f"总用时:{(time() - sta):.2}")
最大的意义:通过一个线程利用其 IO 等待时间去做其他的事情
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。