当前位置:   article > 正文

python异步编程_python 异步编程

python 异步编程

1、协程

1.1 协程是什么

协程不是计算机提供,程序员人为创造。协程(Coroutine)也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。线程是通过时间片抢占来执行程序的,相比与线程的抢占式调度协程de 协作式调度是程序自身控制的切换的,协程的核心思想式主动让出和恢复。协程对比于线程减少了资源消耗。

1.2 协程的发展

python中的tornado、fastapi、django3都支持协程的并发

fastapi->uvicorn->awsgi->uvloop

django3->awsgi

1.2.1 greenlet

greenlet是一个python早期使用协程的第三方的模块

安装

pip install greenlet

  1. from greenlet import greenlet
  2. def func1():
  3. print(1) # 第1步:输出 1
  4. gr2.switch() # 第3步:切换到 func2 函数
  5. print(2) # 第6步:输出 2
  6. gr2.switch() # 第7步:切换到 func2 函数,从上一次执行的位置继续向后执行
  7. def func2():
  8. print(3) # 第4步:输出 3
  9. gr1.switch() # 第5步:切换到 func1 函数,从上一次执行的位置继续向后执行
  10. print(4) # 第8步:输出 4
  11. gr1 = greenlet(func1)
  12. gr2 = greenlet(func2)
  13. gr1.switch() # 第1步:去执行 func1 函数
1.2.2 yield

基于python生成器yield和yield from 关键词也能实现协程代码

  1. ef func1():
  2. yield 1
  3. yield from func2()
  4. yield 2
  5. def func2():
  6. yield 3
  7. yield 4
  8. f1 = func1()
  9. for item in f1:
  10. print(item)
1.2.3 asyncio

在python3.4 出现了aysncio之后,python官方正式支持协程,ayncio比greenlet更加强大,遇到IO操作会自动切换。

  1. import asyncio
  2. @asyncio.coroutine
  3. def func1():
  4. print("func1开始.....")
  5. yield from asyncio.sleep(2)
  6. print("func1完成")
  7. @asyncio.coroutine
  8. def func2():
  9. print("func2开始.....")
  10. yield from asyncio.sleep(2)
  11. print("func2完成")
  12. # 为协程对象创建task
  13. tasks = [
  14. asyncio.ensure_future(func1()),
  15. asyncio.ensure_future(func2())
  16. ]
  17. # 获取事件循环
  18. loop = asyncio.get_event_loop()
  19. loop.run_until_complete(asyncio.wait(tasks))
1.2.4 async & await

为了让asyncio的操作更加简洁,python3.5引入了async和await关键词,

其中async修饰的函数就是一个协程的对象,而await将会等待协程对象\task对象\future对象的执行

  1. import asyncio
  2. async def func1():
  3. print("func1开始.....")
  4. await asyncio.sleep(2)
  5. print("func1完成")
  6. async def func2():
  7. print("func2开始.....")
  8. await asyncio.sleep(2)
  9. print("func2完成")
  10. # 为协程对象创建task
  11. tasks = [
  12. asyncio.ensure_future(func1()),
  13. asyncio.ensure_future(func2())
  14. ]
  15. # 获取事件循环
  16. loop = asyncio.get_event_loop()
  17. loop.run_until_complete(asyncio.wait(tasks))
'
运行

2、协程中的定义

2.1 事件循环

事件循环可以把它当成一个while,这个循环一直在检查任务的状态,将未执行完成的任务执行,剔除已经执行,遇到IO任务进行任务切换。

  1. 任务列表 = [ 任务1, 任务2, 任务3,... ]
  2. while True:
  3. 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行''已完成'的任务返回
  4. for 就绪任务 in 已准备就绪的任务列表:
  5. 执行已就绪的任务
  6. for 已完成的任务 in 已完成的任务列表:
  7. 在任务列表中移除 已完成的任务
  8. 如果 任务列表 中的任务都已完成,则终止循环

2.2 协程对象

使用async定义的是一个协程函数,加()是并不会执行而是生成一个协程对象,并且放入到事件循环中才能执行。

放入到事件循环中有两种方式

直接加入事件循环中

  1. import asyncio
  2. async def func():
  3.    print("正在执行....")
  4. # loop = asyncio.get_event_loop()
  5. # loop.run_until_complete( func() )
  6. asyncio.run(func())

构造成一个task放入

  1. import asyncio
  2. async def func():
  3.    print("正在执行....1")
  4. tasks = [asyncio.ensure_future(func())]
  5. loop = asyncio.get_event_loop()
  6. loop.run_until_complete(asyncio.wait(tasks))

2.3 await

await + 可等待的对象(协程对象, Future, Task)

协程对象

  1. # 协程对象1
  2. import asyncio
  3. async def func():
  4.    print("开始执行func1")
  5. await asyncio.sleep(2)
  6.    print("结束执行func1")
  7. asyncio.run(func())
  1. # 协程对象2
  2. import asyncio
  3. async def func1():
  4. print("开始func1")
  5. await asyncio.sleep(2)
  6. print('结束func1')
  7. return 'ok'
  8. async def func2():
  9. print("开始func2")
  10. response = await func1()
  11. print(f"结束func2:{response}")
  12. asyncio.run(func2())
  13. asyncio.run( func() )

Futrue对象

  1. # Future对象
  2. import asyncio
  3. async def execute_fut(fut):
  4.    print("开始执行fut")
  5.    await asyncio.sleep(2)
  6.    fut.set_result(222)
  7.    print("结束执行fut")
  8. async def main():
  9.    print("开始执行main")
  10.    # 获取当前事件循环
  11.    loop = asyncio.get_running_loop()
  12.    # 创建一个任务Future对象
  13.    fut = loop.create_future()
  14.    await execute_fut(fut)
  15.    # Future没有结果则会一直等下去。
  16.    response = await fut
  17.    print(response)
  18.    print("结束执行main")
  19. asyncio.run(main())

Task对象

  1. import asyncio
  2. async def func():
  3.    print("开始执行func")
  4.    await asyncio.sleep(2)
  5.    print("结束执行func")
  6.    return "ok"
  7. async def main():
  8.    print("开始执行main")
  9.    task1 = asyncio.create_task(func())
  10.    task2 = asyncio.create_task(func())
  11.    response1 = await task1
  12.    response2 = await task2
  13.    print(response1, response2)
  14.    print("结束执行main")
  15. asyncio.run(main())

等同于

  1. import asyncio
  2. async def func():
  3.    print("开始执行func")
  4.    await asyncio.sleep(2)
  5.    print("结束执行func")
  6.    return "ok"
  7. tasks = [
  8.    asyncio.ensure_future(func()),
  9.    asyncio.ensure_future(func())
  10. ]
  11. print("开始执行main")
  12. loop = asyncio.get_event_loop()
  13. loop.run_until_complete(asyncio.wait(tasks))
  14. print("结束执行main")

2.4 Task对象

在协程中可以将多个协程对象封装成一个Task去执行,Task中的协程对象会并发执行,可以通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task()ensure_future() 函数。

  1. # 协程对象串行执行
  2. import asyncio
  3. async def func():
  4.    print("开始执行func")
  5.    await asyncio.sleep(2)
  6.    print("结束执行func")
  7.    return "ok"
  8. async def main():
  9.    print("开始执行main")
  10.    response1 = await func()
  11.    response2 = await func()
  12.    print(response1, response2)
  13.    print("结束执行main")
  14. asyncio.run(main())
  1. # task中的协程会并发执行
  2. import asyncio
  3. async def func():
  4.    print("开始执行func")
  5.    await asyncio.sleep(2)
  6.    print("结束执行func")
  7.    return "ok"
  8. async def main():
  9.    print("开始执行main")
  10.    task1 = asyncio.create_task(func())
  11.    task2 = asyncio.create_task(func())
  12.    response1 = await task1
  13.    response2 = await task2
  14.    print(response1, response2)
  15.    print("结束执行main")
  16. asyncio.run(main())

对比上面两个例子,1中会串行打印开始执行func1,而2中会同时打印开始执行func1`

  1. import asyncio
  2. async def func():
  3.    print("开始执行func")
  4.    await asyncio.sleep(2)
  5.    print("结束执行func")
  6.    return "ok"
  7. async def main():
  8.    print("开始执行main")
  9.    task1 = asyncio.create_task(func(), name="func1")
  10.    task2 = asyncio.create_task(func(), name="func2")
  11.    done, pending = await asyncio.wait([task1, task2], timeout=3)
  12.    print(done)
  13.    print(pending)
  14.    print("结束执行main")
  15. asyncio.run(main())
 

上面的代码中可以加多个Task加入到一个task_list中,并使用asyncio.wait等待结果,Task对象在超时时间未完成将出现在pending中,已经完成的将出现在done中。注意:asyncio.wait传入的是一个列表,列表中元素的Task对象。

  1. import asyncio
  2. async def func():
  3.    print("开始执行func")
  4.    await asyncio.sleep(2)
  5.    print("结束执行func")
  6.    return "ok"
  7. print("开始执行main")
  8. asyncio.run(asyncio.wait([func(), func()]))
  9. print("结束执行main")
'
运行
代码可以简写成上面的形式直接在run中执行协程对象的列表

2.5 asyncio.Future对象

Task继承Future,Task对象内部await结果的处理基于Future对象来的。

实例1:

  1. async def main():
  2.    # 获取当前事件循环
  3.    loop = asyncio.get_running_loop()
  4.    # 创建一个任务(Future对象),这个任务什么都不干。
  5.    fut = loop.create_future()
  6.    # 等待任务最终结果(Future对象),没有结果则会一直等下去。
  7.    await fut
  8. asyncio.run(main())

实例2:

  1. import asyncio
  2. async def set_after(fut):
  3.    await asyncio.sleep(2)
  4.    fut.set_result("ok")
  5. async def main():
  6.    # 获取当前事件循环
  7.    loop = asyncio.get_running_loop()
  8.    # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
  9.    fut = loop.create_future()
  10.    # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
  11.    # 即手动设置future任务的最终结果,那么fut就可以结束了。
  12.    await loop.create_task(  set_after(fut) )
  13.    # 等待 Future对象获取, 等到结果返回
  14.    data = await fut
  15.    print(data)
  16. asyncio.run( main() )

2.6 concurrent 中的Future对象

此Future是线程池或进程池中创建的Future对象,跟协程的Future无关联

  1. import time
  2. from concurrent.futures.thread import ThreadPoolExecutor
  3. from concurrent.futures.process import ProcessPoolExecutor
  4. # 创建线程池
  5. pool = ThreadPoolExecutor(max_workers=5)
  6. # 创建进程池
  7. # pool = ProcessPoolExecutor(max_workers=5)
  8. def func(value):
  9.    print(f"开始执行func:{value}")
  10.    time.sleep(3)
  11.    print(f"执行完成func:{value}")
  12.    return 123
  13. print("开始执行main")
  14. for i in range(10):
  15.    print(f"创建Future:{i + 1}")
  16.    fut = pool.submit(func, i+1)
  17. time.sleep(10)
  18. print("执行完成main")

注意: 线程池的Future可以转换成协程的Future

  1. import asyncio
  2. import requests
  3. async def download_image(url):
  4.    # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
  5.    print("开始下载:", url)
  6.    loop = asyncio.get_event_loop()
  7.    # requests模块默认不支持异步操作,所以就使用线程池来配合实现了。
  8.    future = loop.run_in_executor(None, requests.get, url)
  9.    response = await future
  10.    print('下载完成')
  11.    # 图片保存到本地文件
  12.    file_name = url.rsplit('_')[-1]
  13.    with open(file_name, mode='wb') as file_object:
  14.        file_object.write(response.content)
  15. if __name__ == '__main__':
  16.    url_list = [
  17.        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
  18.        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
  19.        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
  20.   ]
  21.    tasks = [download_image(url) for url in url_list]
  22.    loop = asyncio.get_event_loop()
  23.    loop.run_until_complete(asyncio.wait(tasks))

​如果遇到不支持的协程的模块的时候,可以使用线程或进程池的Future转化协程的Future,底层实际上还是用线程或进程实现的并发操作

2.7 异步上下文

实现了__enter__()和__exit__()的类为上下文管理器,而实现了__aenter__()和__aexit__()为异步上下文管理器,可以使用async with 进行上下文管理

  1. import asyncio
  2. class AsyncContextManager:
  3. def __init__(self):
  4.        self.conn = conn
  5.        
  6.    async def do_something(self):
  7.        # 异步操作数据库
  8.        return 666
  9.    async def __aenter__(self):
  10.        # 异步链接数据库
  11.        self.conn = await asyncio.sleep(1)
  12.        return self
  13.    async def __aexit__(self, exc_type, exc, tb):
  14.        # 异步关闭数据库链接
  15. await asyncio.sleep(1)
  16. async def func():
  17.    async with AsyncContextManager() as f:
  18.        result = await f.do_something()
  19.        print(result)
  20. asyncio.run( func() )

2.8 uvloop

uvloop的协程性能强于asyncio的,可以用uvloop替代asyncio的事件循环

安装: pip install uvloop

  1. import asyncio
  2. import uvloop
  3. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  4. # 编写asyncio的代码,与之前写的代码一致。
  5. # 内部的事件循环自动化会变为uvloop
  6. asyncio.run(...)

3 案例

3.1 异步redis

安装: pip install aioredis

  1. import asyncio
  2. import aioredis
  3. import time
  4. class RedisArray(object):
  5.    def __init__(self, url="redis://192.168.1.200", kname="my_list"):
  6.        """
  7.       :param url: redis url
  8.       :param kname: list name
  9.       """
  10.        self.url = url
  11.        self.kname = kname
  12.    async def create_pool(self):
  13.        """
  14.       创建连接池
  15.       """
  16.        self.aioClient = await aioredis.from_url(self.url)
  17.    async def lpush(self, kvalue):
  18.        """
  19.       往列表左边, 也就是头部插入元素
  20.       :param kvalue: element value
  21.       """
  22.        try:
  23.            # 插入加入延时3秒
  24.            await asyncio.sleep(3)
  25.            return await self.aioClient.lpush(self.kname, kvalue)
  26.        except Exception as e:
  27.            print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
  28.            return None
  29.    async def rpush(self, kvalue):
  30.        """
  31.       往列表右边, 也就是尾部插入元素
  32.       :param kvalue: element value
  33.       """
  34.        try:
  35.            # 插入加入延时3秒
  36.            await asyncio.sleep(3)
  37.            return await self.aioClient.rpush(self.kname, kvalue)
  38.        except Exception as e:
  39.            print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
  40.            return None
  41.    async def get(self, start=0, end=-1):
  42.        """
  43.       获取元素
  44.       :param start: element start index
  45.       :param end: element end index
  46.       """
  47.        try:
  48.            # 获取加入延时3秒
  49.            await asyncio.sleep(3)
  50.            result = await self.aioClient.lrange(self.kname, 0, -1)
  51.            result = [str(item, 'utf-8') for item in result]
  52.            print(result)
  53.            return result
  54.        except Exception as e:
  55.            print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}")
  56.            return []
  57.    async def close(self):
  58.        """
  59.       关闭redis连接
  60.       """
  61.        await self.aioClient.close()
  62. async def main():
  63.    start_time = time.time()
  64.    # 使用异步事件循环执行插入操作
  65.    RA = RedisArray()
  66.    await RA.create_pool()
  67.    await RA.lpush("test22")
  68.    await RA.rpush("test23")
  69.    task1 = asyncio.ensure_future(RA.get())
  70.    task2 = asyncio.ensure_future(RA.get())
  71.    await task1
  72.    await task2
  73.    await RA.close()
  74.    print(f"操作耗时:{time.time()-start_time}")
  75. loop = asyncio.get_event_loop()
  76. loop.run_until_complete(main())
  77. # 总耗时9秒

示例中lpush,rpush,get函数中分别都加了3秒的延迟,但是总的耗时只有9秒,说明get数据是异步的。

3.2 异步mysql

  1. import time
  2. import aiomysql
  3. import asyncio
  4. async def select():
  5.    conn = await aiomysql.connect(host='192.168.1.200', port=3306, user='root', password='123456', db='test')
  6.    sql = "select * from users"
  7.    cursor = await conn.cursor()
  8.    await asyncio.sleep(3)
  9.    await cursor.execute(sql)
  10.    result = await cursor.fetchall()
  11.    await cursor.close()
  12.    conn.close()
  13.    return result
  14. async def main():
  15.    start_time = time.time()
  16.    task1 = asyncio.ensure_future(select())
  17.    task2 = asyncio.ensure_future(select())
  18.    result1 = await task1
  19.    result2 = await task2
  20.    print(result1)
  21.    print(result2)
  22.    print(f"执行耗时:{time.time() - start_time}")
  23. asyncio.run(main())
  24. # 总耗时3秒

在select函数中增加耗时3秒。main函数中两次查询MySQL的数据总共耗时3秒, 说明两次查询时异步的。

3.3 Fastapi

  1. import asyncio
  2. import uvicorn
  3. import aioredis
  4. from aioredis import Redis
  5. from fastapi import FastAPI
  6. app = FastAPI()
  7. # 创建一个redis连接池
  8. redis = aioredis.from_url('redis://192.168.1.200:6379')
  9. @app.get("/")
  10. def index():
  11.    """ 普通操作接口 """
  12.    return {"message": "Hello World"}
  13. @app.get("/red")
  14. async def red():
  15.    """ 异步操作接口 """
  16.    print("请求来了")
  17.    await asyncio.sleep(3)
  18.    # 连接池获取一个连接
  19.    async with redis.client() as conn:
  20.        await conn.set("my-key", "value")
  21.        result = await conn.get("my-key")
  22.    print(result)
  23.    return result
  24. if __name__ == '__main__':
  25.    uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

3.4 异步爬虫

  1. import aiohttp
  2. import asyncio
  3. async def fetch(session, url):
  4.    print("发送请求:", url)
  5.    async with session.get(url, verify_ssl=False) as response:
  6.        text = await response.text()
  7.        print("得到结果:", url, len(text))
  8.        return text
  9. async def main():
  10.    async with aiohttp.ClientSession() as session:
  11.        url_list = [
  12.            'https://www.baidu.com',
  13.            'https://www.baidu.com',
  14.            'https://www.baidu.com'
  15.       ]
  16.        tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
  17.        done,pending = await asyncio.wait(tasks)
  18. if __name__ == '__main__':
  19.    asyncio.run( main() )

3.5 异步服务和socket对比

  1. # socket 服务端
  2. import socket
  3. s = socket.socket()
  4. port = 2324
  5. s.bind(("127.0.0.1", port))
  6. s.listen(5)
  7. while True:
  8. c, addr = s.accept()
  9. print('连接地址:', addr)
  10. c.send("欢迎进入!".encode(encoding="gbk"))
  11. while True:
  12. # 当多个客户端来连接的时候将阻塞在这里
  13. data = c.recv(1024)
  14. print("收到:", data.decode("gbk"))
  15. c.send(data)
  16. c.close()
  1. # socket 客户福
  2. import socket
  3. s = socket.socket()
  4. port = 2324
  5. s.connect(("127.0.0.1", port))
  6. while True:
  7. msg = s.recv(1024)
  8. print(msg.decode(encoding="gbk"))
  9. data = input()
  10. s.send(data.encode(encoding="gbk"))

这个代码只适合一个客户端和一个服务端的通信,因为当服务端接收到客服端的请求执行accept到下一个while的循环里面时会一直阻塞状态,无法接收其他的客户端的请求,想要实现多个客户端通信,并且在服务端对每一个连接开启线程。而使用aysncio则无需如此

  1. import asyncio
  2. async def script_handle(reader, writer):
  3. while True:
  4. data = await asyncio.wait_for(reader.readline(), None)
  5. if not data:
  6. print('client disconnected')
  7. writer.close() # 关闭套接字
  8. await writer.wait_closed() # 等待套接字完全关闭
  9. return
  10. print("收到:", data.decode())
  11. writer.write(b'>'+data)
  12. await writer.drain()
  13. async def main():
  14. server = await asyncio.start_server(script_handle, host='', port=2324)
  15. addr = server.sockets[0].getsockname()
  16. print(f'Serving on {addr}')
  17. async with server:
  18. await server.serve_forever()
  19. if __name__ == '__main__':
  20. asyncio.run(main())

在cmd窗口中输入telnent 127.0.0.1 2324

如图所示,当多个客户端去连接的时候,服务端会使用协程异步接收请求、处理消息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/901979
推荐阅读
相关标签
  

闽ICP备14008679号