赞
踩
协程不是计算机提供,程序员人为创造。协程(Coroutine)也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。线程是通过时间片抢占来执行程序的,相比与线程的抢占式调度协程de 协作式调度是程序自身控制的切换的,协程的核心思想式主动让出和恢复。协程对比于线程减少了资源消耗。
python中的tornado、fastapi、django3都支持协程的并发
fastapi->uvicorn->awsgi->uvloop
django3->awsgi
greenlet是一个python早期使用协程的第三方的模块
安装
pip install greenlet
- from greenlet import greenlet
-
-
- def func1():
- print(1) # 第1步:输出 1
- gr2.switch() # 第3步:切换到 func2 函数
- print(2) # 第6步:输出 2
- gr2.switch() # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
-
-
- def func2():
- print(3) # 第4步:输出 3
- gr1.switch() # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
- print(4) # 第8步:输出 4
-
-
- gr1 = greenlet(func1)
- gr2 = greenlet(func2)
- gr1.switch() # 第1步:去执行 func1 函数
基于python生成器yield和yield from 关键词也能实现协程代码
- ef func1():
- yield 1
- yield from func2()
- yield 2
-
-
- def func2():
- yield 3
- yield 4
-
-
- f1 = func1()
- for item in f1:
- print(item)
在python3.4 出现了aysncio之后,python官方正式支持协程,ayncio比greenlet更加强大,遇到IO操作会自动切换。
- import asyncio
-
-
- @asyncio.coroutine
- def func1():
- print("func1开始.....")
- yield from asyncio.sleep(2)
- print("func1完成")
-
-
- @asyncio.coroutine
- def func2():
- print("func2开始.....")
- yield from asyncio.sleep(2)
- print("func2完成")
-
-
- # 为协程对象创建task
- tasks = [
- asyncio.ensure_future(func1()),
- asyncio.ensure_future(func2())
- ]
- # 获取事件循环
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
为了让asyncio的操作更加简洁,python3.5引入了async和await关键词,
其中async修饰的函数就是一个协程的对象,而await将会等待协程对象\task对象\future对象的执行
- import asyncio
-
-
- async def func1():
- print("func1开始.....")
- await asyncio.sleep(2)
- print("func1完成")
-
-
- async def func2():
- print("func2开始.....")
- await asyncio.sleep(2)
- print("func2完成")
-
-
- # 为协程对象创建task
- tasks = [
- asyncio.ensure_future(func1()),
- asyncio.ensure_future(func2())
- ]
- # 获取事件循环
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
'运行
事件循环可以把它当成一个while,这个循环一直在检查任务的状态,将未执行完成的任务执行,剔除已经执行,遇到IO任务进行任务切换。
-
-
- 任务列表 = [ 任务1, 任务2, 任务3,... ]
-
- while True:
- 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
-
- for 就绪任务 in 已准备就绪的任务列表:
- 执行已就绪的任务
-
- for 已完成的任务 in 已完成的任务列表:
- 在任务列表中移除 已完成的任务
-
- 如果 任务列表 中的任务都已完成,则终止循环
使用async定义的是一个协程函数,加()是并不会执行而是生成一个协程对象,并且放入到事件循环中才能执行。
放入到事件循环中有两种方式
直接加入事件循环中
- import asyncio
-
- async def func():
- print("正在执行....")
-
- # loop = asyncio.get_event_loop()
- # loop.run_until_complete( func() )
- asyncio.run(func())
构造成一个task放入
- import asyncio
-
-
- async def func():
- print("正在执行....1")
-
- tasks = [asyncio.ensure_future(func())]
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
await + 可等待的对象(协程对象, Future, Task)
协程对象
- # 协程对象1
- import asyncio
-
- async def func():
- print("开始执行func1")
- await asyncio.sleep(2)
- print("结束执行func1")
-
- asyncio.run(func())
-
- # 协程对象2
- import asyncio
-
-
- async def func1():
- print("开始func1")
- await asyncio.sleep(2)
- print('结束func1')
- return 'ok'
-
-
- async def func2():
- print("开始func2")
- response = await func1()
- print(f"结束func2:{response}")
-
-
- asyncio.run(func2())
- asyncio.run( func() )
Futrue对象
- # Future对象
- import asyncio
-
-
- async def execute_fut(fut):
- print("开始执行fut")
- await asyncio.sleep(2)
- fut.set_result(222)
- print("结束执行fut")
-
-
- async def main():
- print("开始执行main")
- # 获取当前事件循环
- loop = asyncio.get_running_loop()
-
- # 创建一个任务Future对象
- fut = loop.create_future()
- await execute_fut(fut)
- # Future没有结果则会一直等下去。
- response = await fut
- print(response)
- print("结束执行main")
-
- asyncio.run(main())
Task对象
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- async def main():
- print("开始执行main")
- task1 = asyncio.create_task(func())
- task2 = asyncio.create_task(func())
- response1 = await task1
- response2 = await task2
- print(response1, response2)
- print("结束执行main")
-
- asyncio.run(main())
等同于
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- tasks = [
- asyncio.ensure_future(func()),
- asyncio.ensure_future(func())
- ]
-
- print("开始执行main")
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
- print("结束执行main")
在协程中可以将多个协程对象封装成一个Task去执行,Task中的协程对象会并发执行,可以通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task()
函数以外,还可以用低层级的 loop.create_task()
或 ensure_future()
函数。
- # 协程对象串行执行
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- async def main():
- print("开始执行main")
- response1 = await func()
- response2 = await func()
- print(response1, response2)
- print("结束执行main")
-
- asyncio.run(main())
-
- # task中的协程会并发执行
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- async def main():
- print("开始执行main")
- task1 = asyncio.create_task(func())
- task2 = asyncio.create_task(func())
- response1 = await task1
- response2 = await task2
- print(response1, response2)
- print("结束执行main")
-
- asyncio.run(main())
对比上面两个例子,1中会串行打印开始执行func1
,而2中会同时打印开始执行func
1`
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- async def main():
- print("开始执行main")
- task1 = asyncio.create_task(func(), name="func1")
- task2 = asyncio.create_task(func(), name="func2")
- done, pending = await asyncio.wait([task1, task2], timeout=3)
- print(done)
- print(pending)
- print("结束执行main")
-
- asyncio.run(main())
上面的代码中可以加多个Task加入到一个task_list中,并使用asyncio.wait等待结果,Task对象在超时时间未完成将出现在pending中,已经完成的将出现在done中。注意:asyncio.wait传入的是一个列表,列表中元素的Task对象。
- import asyncio
-
-
- async def func():
- print("开始执行func")
- await asyncio.sleep(2)
- print("结束执行func")
- return "ok"
-
-
- print("开始执行main")
- asyncio.run(asyncio.wait([func(), func()]))
- print("结束执行main")
'运行
代码可以简写成上面的形式直接在run中执行协程对象的列表
Task继承Future,Task对象内部await结果的处理基于Future对象来的。
实例1:
- async def main():
- # 获取当前事件循环
- loop = asyncio.get_running_loop()
-
- # 创建一个任务(Future对象),这个任务什么都不干。
- fut = loop.create_future()
-
- # 等待任务最终结果(Future对象),没有结果则会一直等下去。
- await fut
-
- asyncio.run(main())
实例2:
- import asyncio
-
-
- async def set_after(fut):
- await asyncio.sleep(2)
- fut.set_result("ok")
-
-
- async def main():
- # 获取当前事件循环
- loop = asyncio.get_running_loop()
-
- # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
- fut = loop.create_future()
-
- # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
- # 即手动设置future任务的最终结果,那么fut就可以结束了。
- await loop.create_task( set_after(fut) )
-
- # 等待 Future对象获取, 等到结果返回
- data = await fut
- print(data)
-
- asyncio.run( main() )
此Future是线程池或进程池中创建的Future对象,跟协程的Future无关联
- import time
- from concurrent.futures.thread import ThreadPoolExecutor
- from concurrent.futures.process import ProcessPoolExecutor
-
- # 创建线程池
- pool = ThreadPoolExecutor(max_workers=5)
- # 创建进程池
- # pool = ProcessPoolExecutor(max_workers=5)
-
-
- def func(value):
- print(f"开始执行func:{value}")
- time.sleep(3)
- print(f"执行完成func:{value}")
- return 123
-
-
- print("开始执行main")
- for i in range(10):
- print(f"创建Future:{i + 1}")
- fut = pool.submit(func, i+1)
-
-
- time.sleep(10)
- print("执行完成main")
注意: 线程池的Future可以转换成协程的Future
- import asyncio
- import requests
-
-
- async def download_image(url):
- # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
- print("开始下载:", url)
-
- loop = asyncio.get_event_loop()
- # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
- future = loop.run_in_executor(None, requests.get, url)
- response = await future
- print('下载完成')
- # 图片保存到本地文件
- file_name = url.rsplit('_')[-1]
- with open(file_name, mode='wb') as file_object:
- file_object.write(response.content)
-
-
- if __name__ == '__main__':
- url_list = [
- 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
- 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
- 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
- ]
-
- tasks = [download_image(url) for url in url_list]
-
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
如果遇到不支持的协程的模块的时候,可以使用线程或进程池的Future转化协程的Future,底层实际上还是用线程或进程实现的并发操作
实现了__enter__()和__exit__()的类为上下文管理器,而实现了__aenter__()和__aexit__()为异步上下文管理器,可以使用async with 进行上下文管理
- import asyncio
-
-
- class AsyncContextManager:
- def __init__(self):
- self.conn = conn
-
- async def do_something(self):
- # 异步操作数据库
- return 666
-
- async def __aenter__(self):
- # 异步链接数据库
- self.conn = await asyncio.sleep(1)
- return self
-
- async def __aexit__(self, exc_type, exc, tb):
- # 异步关闭数据库链接
- await asyncio.sleep(1)
-
- async def func():
- async with AsyncContextManager() as f:
- result = await f.do_something()
- print(result)
-
- asyncio.run( func() )
uvloop的协程性能强于asyncio的,可以用uvloop替代asyncio的事件循环
安装: pip install uvloop
- import asyncio
- import uvloop
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
-
- # 编写asyncio的代码,与之前写的代码一致。
-
- # 内部的事件循环自动化会变为uvloop
- asyncio.run(...)
安装: pip install aioredis
- import asyncio
- import aioredis
- import time
-
-
- class RedisArray(object):
- def __init__(self, url="redis://192.168.1.200", kname="my_list"):
- """
- :param url: redis url
- :param kname: list name
- """
- self.url = url
- self.kname = kname
-
- async def create_pool(self):
- """
- 创建连接池
- """
- self.aioClient = await aioredis.from_url(self.url)
-
- async def lpush(self, kvalue):
- """
- 往列表左边, 也就是头部插入元素
- :param kvalue: element value
- """
- try:
- # 插入加入延时3秒
- await asyncio.sleep(3)
- return await self.aioClient.lpush(self.kname, kvalue)
- except Exception as e:
- print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
- return None
-
- async def rpush(self, kvalue):
- """
- 往列表右边, 也就是尾部插入元素
- :param kvalue: element value
- """
- try:
- # 插入加入延时3秒
- await asyncio.sleep(3)
- return await self.aioClient.rpush(self.kname, kvalue)
- except Exception as e:
- print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
- return None
-
- async def get(self, start=0, end=-1):
- """
- 获取元素
- :param start: element start index
- :param end: element end index
- """
- try:
- # 获取加入延时3秒
- await asyncio.sleep(3)
- result = await self.aioClient.lrange(self.kname, 0, -1)
- result = [str(item, 'utf-8') for item in result]
-
- print(result)
- return result
- except Exception as e:
- print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}")
- return []
-
- async def close(self):
- """
- 关闭redis连接
- """
- await self.aioClient.close()
-
-
- async def main():
- start_time = time.time()
- # 使用异步事件循环执行插入操作
- RA = RedisArray()
- await RA.create_pool()
- await RA.lpush("test22")
- await RA.rpush("test23")
- task1 = asyncio.ensure_future(RA.get())
- task2 = asyncio.ensure_future(RA.get())
- await task1
- await task2
- await RA.close()
- print(f"操作耗时:{time.time()-start_time}")
-
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main())
- # 总耗时9秒
示例中lpush,rpush,get函数中分别都加了3秒的延迟,但是总的耗时只有9秒,说明get数据是异步的。
- import time
- import aiomysql
- import asyncio
-
-
- async def select():
- conn = await aiomysql.connect(host='192.168.1.200', port=3306, user='root', password='123456', db='test')
- sql = "select * from users"
- cursor = await conn.cursor()
- await asyncio.sleep(3)
- await cursor.execute(sql)
- result = await cursor.fetchall()
- await cursor.close()
- conn.close()
- return result
-
-
- async def main():
- start_time = time.time()
- task1 = asyncio.ensure_future(select())
- task2 = asyncio.ensure_future(select())
- result1 = await task1
- result2 = await task2
- print(result1)
- print(result2)
- print(f"执行耗时:{time.time() - start_time}")
-
- asyncio.run(main())
- # 总耗时3秒
在select函数中增加耗时3秒。main函数中两次查询MySQL的数据总共耗时3秒, 说明两次查询时异步的。
- import asyncio
-
- import uvicorn
- import aioredis
- from aioredis import Redis
- from fastapi import FastAPI
-
- app = FastAPI()
-
- # 创建一个redis连接池
- redis = aioredis.from_url('redis://192.168.1.200:6379')
-
-
- @app.get("/")
- def index():
- """ 普通操作接口 """
- return {"message": "Hello World"}
-
-
- @app.get("/red")
- async def red():
- """ 异步操作接口 """
-
- print("请求来了")
-
- await asyncio.sleep(3)
- # 连接池获取一个连接
-
- async with redis.client() as conn:
- await conn.set("my-key", "value")
- result = await conn.get("my-key")
- print(result)
- return result
-
-
- if __name__ == '__main__':
- uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
- import aiohttp
- import asyncio
-
-
- async def fetch(session, url):
- print("发送请求:", url)
- async with session.get(url, verify_ssl=False) as response:
- text = await response.text()
- print("得到结果:", url, len(text))
- return text
-
-
- async def main():
- async with aiohttp.ClientSession() as session:
- url_list = [
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com'
- ]
- tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
-
- done,pending = await asyncio.wait(tasks)
-
-
- if __name__ == '__main__':
- asyncio.run( main() )
- # socket 服务端
-
- import socket
-
- s = socket.socket()
- port = 2324
- s.bind(("127.0.0.1", port))
-
- s.listen(5)
- while True:
- c, addr = s.accept()
- print('连接地址:', addr)
- c.send("欢迎进入!".encode(encoding="gbk"))
- while True:
- # 当多个客户端来连接的时候将阻塞在这里
- data = c.recv(1024)
- print("收到:", data.decode("gbk"))
- c.send(data)
- c.close()
-
- # socket 客户福
-
- import socket
-
- s = socket.socket()
- port = 2324
-
- s.connect(("127.0.0.1", port))
- while True:
- msg = s.recv(1024)
- print(msg.decode(encoding="gbk"))
- data = input()
- s.send(data.encode(encoding="gbk"))
这个代码只适合一个客户端和一个服务端的通信,因为当服务端接收到客服端的请求执行accept到下一个while的循环里面时会一直阻塞状态,无法接收其他的客户端的请求,想要实现多个客户端通信,并且在服务端对每一个连接开启线程。而使用aysncio则无需如此
- import asyncio
-
-
- async def script_handle(reader, writer):
- while True:
- data = await asyncio.wait_for(reader.readline(), None)
- if not data:
- print('client disconnected')
- writer.close() # 关闭套接字
- await writer.wait_closed() # 等待套接字完全关闭
- return
- print("收到:", data.decode())
- writer.write(b'>'+data)
- await writer.drain()
-
-
- async def main():
- server = await asyncio.start_server(script_handle, host='', port=2324)
- addr = server.sockets[0].getsockname()
- print(f'Serving on {addr}')
- async with server:
- await server.serve_forever()
-
-
- if __name__ == '__main__':
- asyncio.run(main())
在cmd窗口中输入telnent 127.0.0.1 2324
如图所示,当多个客户端去连接的时候,服务端会使用协程异步接收请求、处理消息
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。