赞
踩
langchain的源码中有很多异步操作,如下图,如果对python异步机制不了解,我们就无法看明白langchain的源码。
Python 中的协程(coroutine)是异步编程的一种强大工具,可以用于编写高效的并发代码。在本文中,我们将以一个简单的示例代码为例,介绍 Python 中的协程及其使用方法。
协程是一种轻量级的线程,可以在程序中实现并发执行的任务。与线程不同的是,协程是由程序员自行控制的,并且可以在运行时挂起和恢复,而不需要操作系统的干预。这使得协程更加高效,可以避免线程切换时的开销和竞争条件。
示例代码
import asyncio from typing import List, Awaitable # 定义一个异步函数,接受一个整数参数n,并返回一个可等待对象(异步字符串) async def example_async_function(n: int) -> str: # 定义一个内部异步函数inner_coroutine,接受一个整数参数n1,并返回一个字符串 async def inner_coroutine(n1: int) -> str: print("Start inner coroutine", n1) # 打印开始内部协程的消息和参数 await asyncio.sleep(5) # 模拟一个耗时的操作,暂停5秒钟 print("End inner coroutine", n1) # 打印结束内部协程的消息和参数 return f"Result {n1}" # 返回一个字符串,表示内部协程的结果 # 返回调用内部协程的结果,等待内部协程执行完毕 return await inner_coroutine(n) # 定义一个异步函数main,不接受参数 async def main(): tasks = [example_async_function(i) for i in range(3)] # 创建一组异步任务列表,包含3个调用example_async_function的协程对象 results = await asyncio.gather(*tasks) # 并行执行多个协程,并等待它们全部完成,返回执行结果列表 for result in results: # 遍历执行结果列表 print("Async function returned:", result) # 打印每个协程的执行结果 asyncio.run(main()) # 运行主协程
效果
由图片看出,三个函数是并行运行的,Start inner* 这句话都出现在了 End inner* 的前面
定义协程函数: 在 Python 中,使用 async def
关键字定义一个协程函数。协程函数与普通函数类似,但可以包含 await
关键字,用于等待异步操作的完成。
调用协程函数: 在代码中调用协程函数时,不会立即执行它,而是返回一个协程对象。要执行协程函数,需要使用 await
关键字等待其执行结果,或者使用 asyncio.run()
函数来运行主协程。
并发执行多个协程: Python 提供了 asyncio.gather()
函数,可以并行执行多个协程,并等待它们全部完成。通过将多个协程对象传递给 asyncio.gather()
函数,可以实现并发执行多个任务。
高效利用资源: 协程可以在一个线程内并发执行多个任务,避免了线程切换的开销,提高了程序的性能和效率。
简化并发编程: 协程使用了 await
和 asyncio
等异步编程工具,使得编写并发代码变得更加简洁和易于理解。
避免竞争条件: 协程是由程序员自行控制的,可以避免多线程编程中常见的竞争条件和同步问题,提高了代码的可靠性和稳定性。
asyncio.run()
和 asyncio.gather()
是 asyncio 库中两个不同的函数,它们的作用和用法略有不同。
1) asyncio.run()
作用: asyncio.run()
函数用于运行一个 asyncio 程序的主协程。
用法: 你将你的主协程作为 asyncio.run()
的参数传递给它,它会运行这个协程直到完成,然后关闭 asyncio 事件循环。
示例: 以下是一个简单的示例,演示了如何使用 asyncio.run()
运行一个主协程:
import asyncio
async def main():
print("Hello, world!")
asyncio.run(main())
2) asyncio.gather()
作用: asyncio.gather()
函数用于并行执行多个协程,并等待它们全部完成。
用法: 你将多个协程对象作为参数传递给 asyncio.gather()
函数,它会并发执行这些协程,然后等待它们全部完成,并返回一个结果列表。
示例: 以下是一个简单的示例,演示了如何使用 asyncio.gather()
并行执行多个协程:
import asyncio
async def coro1():
await asyncio.sleep(1)
return "Result 1"
async def coro2():
await asyncio.sleep(2)
return "Result 2"
async def main():
results = await asyncio.gather(coro1(), coro2())
print("Results:", results)
asyncio.run(main())
3) 区别
asyncio.run()
是用于运行主协程的函数,而 asyncio.gather()
是用于并行执行多个协程的函数。
asyncio.run()
只能运行一个主协程,而 asyncio.gather()
可以并行执行多个协程,并等待它们全部完成。
总的来说,asyncio.run()
用于运行整个 asyncio 程序的主协程,而 asyncio.gather()
用于并行执行多个协程,并等待它们全部完成。
asyncio.get_event_loop()通常这么用,它也是用来运行主线程的
loop = asyncio.get_event_loop();
loop.run_until_complete(test())
asyncio.run(test())
和 loop = asyncio.get_event_loop(); loop.run_until_complete(test())
在功能上是相似的,都用于运行一个 asyncio 程序的主协程。但是,它们之间有一些细微的差别:
1) asyncio.run()
:
asyncio.run(test())
会自动创建和管理事件循环,运行指定的主协程,然后在协程完成后关闭事件循环。2) asyncio.get_event_loop()
+ loop.run_until_complete()
:
asyncio.get_event_loop()
获取当前线程的事件循环,然后调用 run_until_complete()
方法来运行指定的协程。loop.close()
来手动关闭事件循环。asyncio.run()
。因此,一般情况下,推荐使用 asyncio.run()
来运行主协程,因为它更加简洁、方便,并且会自动处理事件循环的创建和关闭。
开篇例子里的任务是这样创建的,然后使用asyncio.gather来执行
tasks = [example_async_function(i) for i in range(3)] # 创建一组异步任务列表,包含3个调用example_async_function的协程对象
results = await asyncio.gather(*tasks)
也可以通过asyncio.create_task来创建任务
import asyncio from datetime import datetime from typing import List, Awaitable def getCurrentTime(): # 定义一个函数,用于获取当前时间的毫秒数 current_time = datetime.now() # 获取当前时间 milliseconds = int(current_time.timestamp() * 1000) # 将当前时间转换为毫秒数 return str(milliseconds) # 返回毫秒数的字符串表示 async def example_async_function(n: int) -> str: async def inner_coroutine(n1: int) -> str: print("Start inner coroutine", n1, getCurrentTime()) # 打印开始内部协程的消息和时间 await asyncio.sleep(10) # 模拟一个耗时的操作,暂停10秒钟 print("End inner coroutine", n1, getCurrentTime()) # 打印结束内部协程的消息和时间 return f"Result {n1}" # 返回一个字符串,表示内部协程的结果 return await inner_coroutine(n) # 等待内部协程执行完毕,并返回结果 async def main(): tasks = [] # 定义一个空列表,用于存储创建的任务 print("Start creating tasks", getCurrentTime()) # 打印开始创建任务的消息和时间 for i in range(3): task = asyncio.create_task(example_async_function(i), name="task" + str(i)) # 创建一个任务,并添加到任务列表中 tasks.append(task) print("End creating tasks", getCurrentTime()) # 打印结束创建任务的消息和时间 # 等待所有任务完成 done, _ = await asyncio.wait(tasks) # 等待所有任务完成,并获取已完成的任务列表 print("task done", getCurrentTime()) # 打印任务完成的消息和时间 # 获取任务的执行结果 results = [task.result() for task in done] # 获取已完成任务的执行结果 # 打印执行结果 for result in results: print("Async function returned:", result, getCurrentTime()) # 打印每个协程的执行结果和时间 asyncio.run(main()) # 运行主协程
他们没什么区别,只是写法不同。在langchain里两种写法都有出现
看完了这篇代码再回过去看相关的异步代码,是不是就觉得容易了许多呢?
Python 中的协程是一种强大的异步编程工具,可以帮助我们编写高效、可维护的并发代码。通过合理地使用协程,我们可以充分发挥 Python 的异步编程能力,实现更加优雅和高效的程序设计。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。