赞
踩
原文:读懂 FastChat 大模型部署源码所需的异步编程基础 - 知乎
目录
0. 前言
1. 同步与异步的区别
2. 协程
3. 事件循环
4. await
5. 组合协程
6. 使用 Semaphore 限制并发数
7. 运行阻塞任务
8. 异步迭代器 async for
9. 异步上下文管理器 async with
10. 参考
本文是读懂 FastChat 大模型部署源码系列的第二篇,持续更新中,欢迎关注:
不理不理:读懂 FastChat 大模型部署源码所需的 Web 基础
不理不理:读懂 FastChat 大模型部署源码所需的异步编程基础
如果觉得本文有帮助,麻烦点个小小的赞~可以让更多人看到,谢谢大家啦~
FastChat 是 2023 年非常知名的一个大语言模型项目,该项目不仅提供了大语言模型全量参数微调、Lora参数微调、模型推断、模型量化、模型部署及调度等全套的源代码,而且还开源了他们基于 LLaMA2 底座进行指令微调的一系列 Vicuna 模型权重,因此非常适合学习和使用。
就 FastChat 模型部署部分而言,它分为三个部分:controller、worker、api_server。这三个服务使用 FastAPI + Uvicorn 的方式构建,都是单线程程序,且各自都支持并发
本文将会分享读懂 FastChat 模型部署源码的异步编程基础,绝不超纲(纲是 FastChat)
在传统的同步编程中,代码按照顺序逐行执行,前一个操作完成后才能执行下一个操作。若有一些耗时的操作则会导致整个程序的阻塞,降低程序的性能和响应能力。
而在异步编程中,当遇到耗时的操作(比如 IO)时不会等待操作完成,而是继续执行其他代码。这在有多个用户并发请求的情况下,异步方式编写的接口可以在 IO 等待的过程中去处理其他请求,从而提高程序的性能。
比方说我们去网上下载三张图片,使用同步编程实现:
- import requests # requests 是仅支持同步编程的http请求库
-
- def download_img(url):
- print("开始下载:", url)
- response = requests.get(url) # 发送请求, 下载图片
- file_name = url.rsplit('_')[-1]
- with open(file_name, mode='wb') as f: # 将图片保存到本地
- f.write(response.content)
- print("下载完成")
-
- if __name__ == '__main__':
- url_list = [
- 'https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg',
- 'https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg',
- 'https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg'
- ]
- for item in url_list:
- download_img(item)
-
- # 运行结果:
- # 开始下载: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
- # 下载完成
- # 开始下载: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
- # 下载完成
- # 开始下载: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
- # 下载完成
假设每下载一张图片需要 1 秒,那么上述程序完整执行大概需要 3 秒,时间主要花在了 IO 部分。
若使用异步编程,则只需 1 秒左右(代码看不懂可忽略)。
- import aiohttp # aiohttp 是支持异步编程的http请求库
- import asyncio
-
- async def fetch(sess, url):
- print("发送请求: ", url)
- async with sess.get(url, verify_ssl=False) as response:
- print("等待响应: ", url)
- content = await response.content.read()
- file_name = url.rsplit('/')[-1]
- print("开始写入: ", file_name)
- with open(file_name, mode='wb') as f:
- f.write(content)
-
- async def main():
- async with aiohttp.ClientSession() as sess:
- url_list = [
- 'https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg',
- 'https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg',
- 'https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg'
- ]
- tasks = [asyncio.create_task(fetch(sess, url)) for url in url_list]
- await asyncio.wait(tasks)
-
- if __name__ == '__main__':
- asyncio.run(main())
-
- # 运行结果:
- # 发送请求: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
- # 发送请求: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
- # 发送请求: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
- # 等待响应: https://pic.ntimg.cn/file/20170926/9885883_140434796000_2.jpg
- # 开始写入: 9885883_140434796000_2.jpg
- # 等待响应: https://pic.ntimg.cn/20131112/10370254_003820848001_2.jpg
- # 等待响应: https://img.zcool.cn/community/014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
- # 开始写入: 014bbb554355150000019ae919906f.jpg@1280w_1l_2o_100sh.jpg
- # 开始写入: 10370254_003820848001_2.jpg
异步程序会在发送第一张图的下载请求时,不等结果的返回就开始第二张图的下载;会在发送第二张图的下载请求时,依然不等结果的返回就开始第三张图的下载。
像上面这种经过 async 和 await 关键字装饰的函数,称之为基于协程的异步函数,这种编程方式也叫异步编程。异步编程是通过让一个线程在执行某个任务的 IO 等待时间去执行其他任务,从而实现并发。
定义形式为 async def 的函数称之为协程(异步函数)。
- # 定义一个协程函数
- async def func():
- pass
调用协程只会创建协程对象,不会执行函数内部的代码。若想执行协程的内部代码,须配合事件循环一起使用。
协程的优势在于,能够在 IO 等待时执行其他协程,当 IO 操作结束后会自动回调至原先协程,这样就可以在节省资源的同时提高性能。另外,协程也让原本需要用异步+回调方式完成的非人类代码,用看似同步的方式写出来。
事件循环,可以把他当做是一个 while 循环,这个 while 循环会周期性的运行并执行一些任务,然后在特定条件下终止循环。
- # 伪代码
- 任务列表 = [ 任务1, 任务2, 任务3,... ]
- while True:
- 可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将'可执行'和'已完成'的任务返回
- for 已就绪任务 in 已准备就绪的任务列表:
- 执行 已就绪任务
- for 已完成的任务 in 已完成的任务列表:
- 在任务列表中移除 已完成的任务
- 如果 任务列表 中的任务都已完成,则终止循环
如果想要执行协程函数内部的代码,需要事件循环和协程对象配合才能实现,如:
- import asyncio
-
- async def func():
- print("inner code")
-
- # 方式一
- # loop = asyncio.get_event_loop() # 创建一个事件循环
- # loop.run_until_complete(func()) # 将协程当做任务提交到事件循环的任务列表中,协程执行完成之后终止
-
- # 方式二
- asyncio.run(func()) # 是一个简便的写法,与方式一本质相同
-
- # 运行结果:
- # inner code
asyncio 是 Python3.4 中的新增模块,它提供了一种机制,使得你可以用协程、IO 复用在单线程环境中编写并发模型。
上述代码可以简单理解为:将协程当做任务添加到事件循环的任务列表,然后事件循环会检测列表中的协程是否已准备就绪(默认可理解为就绪状态),如果准备就绪则执行其内部代码。
await 是在 Python3.5 中引入的关键字,表示等待其后对象运行结束,后面可接协程对象、Task 对象(封装后的协程对象)、IO 操作。
值得注意的是,要调用协程,必须使用 await 关键字;另外也不能在同步函数里使用 await,否则会报错。下面使用异步编程方式执行 1+2=3 的计算过程。
- import asyncio
-
- async def compute(x, y):
- print("Compute %s + %s ..." % (x, y))
- await asyncio.sleep(1.0)
- return x + y
-
- async def print_sum(x, y):
- print("Start compute ...")
- result = await compute(x, y)
- print("%s + %s = %s" % (x, y, result))
-
- asyncio.run(print_sum(1, 2))
-
- # 运行结果:
- # Start compute ...
- # Compute 1 + 2 ...
- # 1 + 2 = 3
上述示例在 IO 等待时无法演示切换到其他任务的效果,难以体会到协程的优势,要想在程序中创建多个任务对象,就需要使用 Task。
通过 asyncio.create_task(协程对象) 方式创建 Task 对象,这样可以让协程加入事件循环中等待被调度执行。
asyncio.gather(任务列表) 会将任务列表中传入的一系列任务合并成一个组合协程,其内部也是异步执行的。组合协程总的执行时间取决于任务列表中最耗时的那个任务,同时也只有当任务列表中的所有任务都执行完毕,才能返回主协程挂起处继续执行剩余的代码。
- import asyncio
-
- async def func(n):
- print("start: ", n)
- await asyncio.sleep(n * 0.1) # 模拟IO操作
- print("end: ", n)
- return n
-
- async def main():
- # 按顺序在任务列表中分别添加四个任务
- tasks = [asyncio.create_task(func(i)) for i in range(1, 5)]
- # 组合协程的执行时间取决于tasks中最耗时的那个任务
- complete = await asyncio.gather(*tasks)
- # 只有所有任务都执行完毕才能执行下面的语句
- # 返回值顺序同tasks内部元素的定义顺序
- for i in complete:
- print("当前数字: ", i)
-
- asyncio.run(main())
-
- # 运行结果:
- # start: 1
- # start: 2
- # start: 3
- # start: 4
- # end: 1
- # end: 2
- # end: 3
- # end: 4
- # 当前数字: 1
- # 当前数字: 2
- # 当前数字: 3
- # 当前数字: 4
由于异步编程也仅是单线程运行,为了防止服务超载,我们有时候需要使用 asyncio.Semaphore(n) 限制最大并发数量。
asyncio.Semaphore(n) 内部管理一个计数器,计数器的初始值为 n,即最大并发数量。该计数器由 acquire() 调用递减,release() 调用递增,且计数器永远不会低于零。
如果并发数没有达到上限,那么 acquire() 会瞬间执行完成,进入正式代码中。如果并发数已经达到了限制,那么其他的协程会阻塞在 acquire() 这个地方,直到正在运行的某个协程调用 release(),才会放行一个新的协程。
- import asyncio
- from datetime import datetime
-
- async def func(n, semaphore):
- print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} enter") # 第5行
- # ----------------------------------------------------------------------------------
- await semaphore.acquire() # 第7行
- print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} start") # 第8行
- await asyncio.sleep(2) # 第9行
- print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} Semaphore(value={semaphore._value}, locked={semaphore.locked()})")
- semaphore.release() # 第11行
- # ----------------------------------------------------------------------------------
- # 横线里的这段代码等价于
- # async with semaphore:
- # print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} start")
- # await asyncio.sleep(2)
- # print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} Semaphore(value={semaphore._value}, locked={semaphore.locked()})")
- print(f"time: {datetime.now().strftime('%H:%M:%S')} func {n} end") # 第18行
- return n
-
- async def main():
- semaphore = asyncio.Semaphore(3)
- tasks = [asyncio.create_task(func(i, semaphore)) for i in range(6)]
- complete = await asyncio.gather(*tasks)
- for i in complete:
- print("当前数字: ", i)
-
- asyncio.run(main())
-
- # 运行结果:
- # time: 14:15:29 func 0 enter
- # time: 14:15:29 func 0 start
- # time: 14:15:29 func 1 enter
- # time: 14:15:29 func 1 start
- # time: 14:15:29 func 2 enter
- # time: 14:15:29 func 2 start
- # time: 14:15:29 func 3 enter
- # time: 14:15:29 func 4 enter
- # time: 14:15:29 func 5 enter
- # time: 14:15:31 func 0 Semaphore(value=0, locked=True)
- # time: 14:15:31 func 0 end
- # time: 14:15:31 func 1 Semaphore(value=1, locked=False)
- # time: 14:15:31 func 1 end
- # time: 14:15:31 func 2 Semaphore(value=2, locked=False)
- # time: 14:15:31 func 2 end
- # time: 14:15:31 func 3 start
- # time: 14:15:31 func 4 start
- # time: 14:15:31 func 5 start
- # time: 14:15:33 func 3 Semaphore(value=0, locked=True)
- # time: 14:15:33 func 3 end
- # time: 14:15:33 func 4 Semaphore(value=1, locked=False)
- # time: 14:15:33 func 4 end
- # time: 14:15:33 func 5 Semaphore(value=2, locked=False)
- # time: 14:15:33 func 5 end
- # 当前数字: 0
- # 当前数字: 1
- # 当前数字: 2
- # 当前数字: 3
- # 当前数字: 4
- # 当前数字: 5
阻塞任务是指阻止当前线程继续进行的任务,如果在 asyncio 程序中执行阻塞任务,它会停止整个事件循环,从而阻止其他协程在后台运行。
我们可以通过 asyncio.to_thread(func()) 函数在程序中另开一个单独的线程,异步运行阻塞任务,该函数返回一个可被等待以获取 func() 最终结果的协程。
- import asyncio
- import time
- import datetime
-
- def blocking_task():
- print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} blocking_task start")
- time.sleep(5)
- print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} blocking_task end")
- return "blocking_task Done"
-
- async def func(n):
- print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} func {n} start")
- await asyncio.sleep(2)
- print(f"time: {datetime.datetime.now().strftime('%H:%M:%S')} func {n} end")
- return f"func {n} Done"
-
- async def main():
- # 可以尝试用blocking_task(); tasks = [asyncio.create_task(func(i)) for i in range(3)]替换下面这句, 看看会发生什么
- tasks = [asyncio.to_thread(blocking_task)] + [asyncio.create_task(func(i)) for i in range(3)]
- complete = await asyncio.gather(*tasks)
- for i in complete:
- print("当前结果: ", i)
-
- asyncio.run(main())
-
- # 运行结果:
- # time: 15:00:53 func 0 start
- # time: 15:00:53 func 1 start
- # time: 15:00:53 func 2 start
- # time: 15:00:53 blocking_task start
- # time: 15:00:55 func 0 end
- # time: 15:00:55 func 1 end
- # time: 15:00:55 func 2 end
- # time: 15:00:58 blocking_task end
- # 当前结果: blocking_task Done
- # 当前结果: func 0 Done
- # 当前结果: func 1 Done
- # 当前结果: func 2 Done
可以看到,另起一个线程异步运行阻塞任务时,完全不影响事件循环中其他协程的运行。
for 循环遍历一个可迭代对象时,遍历过程中无法执行其他任务,而 async for 语法允许我们在异步环境下遍历可迭代对象。
- import asyncio
-
- async def a_generator():
- for i in range(3):
- await asyncio.sleep(1) # 模拟IO耗时操作
- yield i
-
- async def iter_func():
- print(f"iter_func start")
- async for item in a_generator():
- print(item)
- print(f"iter_func end")
- return 'iter_func Done'
-
- async def func(i):
- print(f"func{i} start")
- await asyncio.sleep(1.5) # 模拟IO耗时操作
- print(f"func{i} end")
- return f'func{i} Done'
-
- async def main():
- tasks = [asyncio.create_task(func(1)), asyncio.create_task(iter_func()), asyncio.create_task(func(2))]
- complete = await asyncio.gather(*tasks)
- for i in complete:
- print("当前结果: ", i)
-
- asyncio.run(main())
-
- # 运行结果:
- # func1 start
- # iter_func start
- # func2 start
- # 0
- # func1 end
- # func2 end
- # 1
- # 2
- # iter_func end
- # 当前结果: func1 Done
- # 当前结果: iter_func Done
- # 当前结果: func2 Done
从上述示例中可以看出,运行 async for item in a_generator() 时不会因为还没迭代结束就阻塞事件循环,而是会在 IO 时协调运行其他协程。
使用 with 进行上下文管理,解释器会在进入时自动调用 __enter__ 方法,退出时调用 __exit__ 方法。整个流程顺序执行,因而无法在 __enter__ 与 do_something()、do_something() 与 __exit__ 之间穿插其他任务。
- class Example:
- def __enter__(self):
- print('enter') #进入资源
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- print('exit') #释放资源
-
- def do_something(self):
- print('do_something')
-
- with Example() as example:
- example.do_something()
-
- # 运行结果:
- # enter
- # do something
- # exit
async with 称为异步上下文管理器,能够将其进入的 __enter__ 和退出的 __exit__ 函数暂时挂起,以执行事件循环中的其他协程。为了实现这样的功能,需要加入两个新的方法:__aenter__ 和 __aexit__,这两个方法都需要返回 awaitable 类型的值。
- import asyncio
-
- async def log(text):
- await asyncio.sleep(0.1)
- print(text)
-
- class AsyncContextManager:
- async def __aenter__(self):
- await log('entering context')
-
- async def __aexit__(self, exc_type, exc, tb):
- await log('exiting context')
-
- async def run_async_with():
- print("async with func start")
- async with AsyncContextManager() as c:
- print("使用 async with 来管理异步上下文")
- print("async with func end")
- return f"async with func Done"
-
- async def func(i):
- print(f"func{i} start")
- await asyncio.sleep(1) # 模拟IO耗时操作
- print(f"func{i} end")
- return f'func{i} Done'
-
- async def main():
- tasks = [
- asyncio.create_task(func(1)),
- asyncio.create_task(run_async_with()),
- asyncio.create_task(func(2))
- ]
- complete = await asyncio.gather(*tasks)
- for i in complete:
- print("当前结果: ", i)
-
- asyncio.run(main())
-
- # 运行结果:
- # func1 start
- # async with func start
- # func2 start
- # entering context
- # 使用 async with 来管理异步上下文
- # exiting context
- # async with func end
- # func1 end
- # func2 end
- # 当前结果: func1 Done
- # 当前结果: async with func Done
- # 当前结果: func2 Done
【精选】多任务编程事件循环_fastapi事件循环代码入口_发呆的比目鱼的博客-CSDN博客
从0到1,Python异步编程的演进之路 - 知乎 (zhihu.com)
Python协程 & 异步编程(asyncio) 入门介绍 - 知乎 (zhihu.com)
一份详细的asyncio入门教程 - 知乎 (zhihu.com)
Python 为什么需要async for和async with|极客笔记 (deepinout.com)
with与async with - 简书 (jianshu.com)
Python 3.5+ 协程 ( coroutines ) 之 async with 表达式 - 简单教程,简单编程 (twle.cn)
Python asyncio.Semaphore用法及代码示例 - 纯净天空 (vimsky.com)
如何使用 asyncio 限制协程的并发数 - 侃豺小哥 - 博客园 (cnblogs.com)
Python 异步: 在 Asyncio 中运行阻塞任务(14) - 知乎 (zhihu.com)
如有错误,欢迎指正!近期也在加紧制作一期 FastChat 大模型部署时的并发及调度原理详解,敬请期待
发布于 2023-11-17 01:18・IP 属地上海
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。