赞
踩
在LangChain中,流式传输(Streaming)是一种技术,它允许应用程序逐步接收和处理语言模型(LLM)生成的输出,而不是等待整个响应生成完毕后再进行处理。这种技术对于提升用户体验(UX)尤其重要,因为它可以即时提供信息,而不是让用户等待。
1. 流式传输的好处
在实际应用中,使用流式传输的好处如下:
2. 流式传输的实现方法
在LangChain中,提供了如下三种不同的流式传输方法。
3. 实现流式传输
在LangChain中,实现流式传输的基本步骤如下所示。
(1)启用流式传输:在创建语言模型实例时,设置streaming=True以启用流式传输。
(2)定义工具:创建工具(tools),这些工具可以被代理在执行任务时调用。
(3)创建代理:使用语言模型和工具创建代理,并为其配置流式传输。
(4)执行流式传输:使用stream、astream或astream_events方法执行流式传输,并处理逐步接收到的数据。
4. 流式传输的输出
流式传输的输出通常包括:
5. 自定义流式传输(Custom Streaming)
在某些情况下,开发者可能需要自定义流式传输的行为。LangChain允许通过astream_events API来捕获和流式传输更多的细节,例如:
请看下面的例子,使用库LangChain和asyncio创建并运行了一个自定义代理程序。
实例6-1:创建并运行了一个自定义代理(源码路径:codes\6\Streaming01.py)
实例文件Streaming01.py的具体实现代码如下所示。
- import asyncio
- from langchain import hub
- from langchain.agents import AgentExecutor, create_openai_tools_agent
- from langchain.tools import tool
- from langchain_openai import ChatOpenAI
- import random
- import pprint
-
- model = ChatOpenAI(temperature=0, streaming=True)
-
- @tool
- async def where_cat_is_hiding() -> str:
- return random.choice(["under the bed", "on the shelf"])
-
- @tool
- async def get_items(place: str) -> str:
- if "bed" in place:
- return "socks, shoes and dust bunnies"
- if "shelf" in place: # For 'shelf'
- return "books, pencils and pictures"
- else:
- return "cat snacks"
-
- async def main():
- await where_cat_is_hiding.ainvoke({})
- await get_items.ainvoke({"place": "shelf"})
-
- prompt = hub.pull("hwchase17/openai-tools-agent")
- tools = [get_items, where_cat_is_hiding]
- agent = create_openai_tools_agent(
- model.with_config({"tags": ["agent_llm"]}), tools, prompt
- )
- agent_executor = AgentExecutor(agent=agent, tools=tools).with_config(
- {"run_name": "Agent"}
- )
-
- chunks = []
-
- async for chunk in agent_executor.astream(
- {"input": "what's items are located where the cat is hiding?"}
- ):
- chunks.append(chunk)
- print("------")
- pprint.pprint(chunk, depth=1)
-
- asyncio.run(main())
上述代码的实现流程如下所示:
- ------
- {
- 'actions': [
- AgentAction(tool='where_cat_is_hiding', tool_input={}, log='Invoking: `where_cat_is_hiding` with `{}`')
- ],
- 'messages': [
- AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_id_1', 'function': {'arguments': '{}', 'name': 'where_cat_is_hiding'}, 'type': 'function'}]})
- ]
- }
- ------
- {
- 'messages': [
- FunctionMessage(content='on the shelf', name='where_cat_is_hiding')
- ],
- 'steps': [
- AgentStep(action=AgentAction(...), observation='on the shelf')
- ]
- }
- ------
- {
- 'actions': [
- AgentAction(tool='get_items', tool_input={'place': 'shelf'}, log='Invoking: `get_items` with `{"place": "shelf"}`')
- ],
- 'messages': [
- AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 1, 'id': 'call_id_2', 'function': {'arguments': '{"place": "shelf"}', 'name': 'get_items'}, 'type': 'function'}]})
- ]
- }
- ------
- {
- 'messages': [
- FunctionMessage(content='books, pencils and pictures', name='get_items')
- ],
- 'steps': [
- AgentStep(action=AgentAction(...), observation='books, pencils and pictures')
- ]
- }
- ------
- {
- 'messages': [
- AIMessage(content='The items located where the cat is hiding on the shelf are books, pencils, and pictures.')
- ],
- 'output': 'The items located where the cat is hiding on the shelf are books, pencils, and pictures.'
- }
在接下来的内容中,我们可以进一步操作上面的自定义代理,增加他的功能。
1. 消息(Messages)
在LangChain中,代理的输出包含了丰富的信息,包括动作(actions)、观察(observations)、步骤(steps)和最终答案(output)。除了这些结构化的数据,代理的交互还可以通过消息(messages)来访问和展示。每个代理输出的块(chunk)都包含了一个或多个消息,这些消息记录了代理执行过程中的通信。例如,在下面的代码中调用chunks[0]["actions"]时,可以看到代理执行的第一个动作是调用where_cat_is_hiding工具。
- chunks[0]["actions"]
- for chunk in chunks:
- print(chunk["messages"])
执行后会输出:
- [AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_pKy4OLcBx6pR6k3GHBOlH68r', 'function': {'arguments': '{}', 'name': 'where_cat_is_hiding'}, 'type': 'function'}]})]
- [FunctionMessage(content='on the shelf', name='where_cat_is_hiding')]
- [AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_qZTz1mRfCCXT18SUy0E07eS4', 'function': {'arguments': '{\n "place": "shelf"\n}', 'name': 'get_items'}, 'type': 'function'}]})]
- [FunctionMessage(content='books, penciles and pictures', name='get_items')]
- [AIMessage(content='The items located where the cat is hiding on the shelf are books, pencils, and pictures.')]
2. 使用AgentAction/Observation
LangChain的代理输出不仅包含了消息,还包含了更丰富的结构化信息,这些信息被包含在动作(actions)和步骤(steps)中。这些结构化信息在某些情况下非常有用,但也可能会更难解析。
在LangChain中,可以使用AgentAction和Observation来处理代理输出中的结构化信息,这些信息可以用于构建更复杂的应用程序。AgentAction和Observation的结构如下:
例如下面是一个异步流式传输的例子,展示了逐步处理代理输出的用法。
- async for chunk in agent_executor.astream(
- {"input": "what's items are located where the cat is hiding?"}
- ):
- # 代理动作
- if "actions" in chunk:
- for action in chunk["actions"]:
- print(f"Calling Tool: `{action.tool}` with input `{action.tool_input}`")
- # 观察结果
- elif "steps" in chunk:
- for step in chunk["steps"]:
- print(f"Tool Result: `{step.observation}`")
- # 最终结果
- elif "output" in chunk:
- print(f'Final Output: {chunk["output"]}')
- else:
- raise ValueError()
- print("---")
执行后会输出:
- Calling Tool: `where_cat_is_hiding` with input `{}`
- ---
- Tool Result: `on the shelf`
- ---
- Calling Tool: `get_items` with input `{'place': 'shelf'}`
- ---
- Tool Result: `books, pencils and pictures`
- ---
- Final Output: The items located where the cat is hiding on the shelf are books, pencils, and pictures.
- ---
3. 使用astream_events API自定义流式传输
在LangChain框架中,astream_events API 提供了一种灵活的方法来实时传递代理执行过程中的事件流。这对于需要精细控制代理行为和定制用户界面的应用场景非常有帮助。通过这个API,开发者可以捕获和响应代理操作的各个阶段,包括代理启动、工具调用的开始和结束,以及最终答案的逐个令牌流式传输。
通过使用astream_events API,开发者可以根据特定的应用需求,定制化地处理这些事件。例如,可以在代理开始处理输入时显示一个加载指示器,在工具完成执行时更新界面,或者在最终答案生成时高亮显示结果。这种细粒度的控制使得用户界面能够更加动态和响应式,提供更好的用户体验。
例如下面的代码展示了使用astream_events API实现异步流式传输和处理代理执行的事件的过程。
- async for event in agent_executor.astream_events(
- {"input": "where is the cat hiding? what items are in that location?"},
- version="v1",
- ):
- kind = event["event"]
- if kind == "on_chain_start":
- if event["name"] == "Agent":
- print(f"Starting agent: {event['name']} with input: {event['data'].get('input')}")
- elif kind == "on_chain_end":
- if event["name"] == "Agent":
- print()
- print("--")
- print(f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}")
- if kind == "on_chat_model_stream":
- content = event["data"]["chunk"].content
- if content:
- print(content, end="|")
- elif kind == "on_tool_start":
- print("--")
- print(f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}")
- elif kind == "on_tool_end":
- print(f"Done tool: {event['name']}")
- print(f"Tool output was: {event['data'].get('output')}")
- print("--")
在上述代码中,使用async for循环遍历agent_executor.astream_events返回的事件流。对于每种类型的事件,代码中都有一个对应的if或elif分支来处理。这允许开发者根据事件的类型执行特定的操作,例如打印日志、更新界面或者执行其他业务逻辑。在上述代码中,事件的处理流程如下:
执行后会输出:
- Starting agent: Agent with input: {'input': 'where is the cat hiding? what items are in that location?'}
- --
- Starting tool: where_cat_is_hiding with inputs: {}
- Done tool: where_cat_is_hiding
- Tool output was: on the shelf
- --
- --
- Starting tool: get_items with inputs: {'place': 'shelf'}
- Done tool: get_items
- Tool output was: books, pencils and pictures
- --
- The| cat| is| currently| hiding| on| the| shelf|.| In| that| location|,| you| can| find| books|,| pencils|,| and| pictures|.|
- --
- Done agent: Agent with output: The cat is currently hiding on the shelf. In that location, you can find books, pencils, and pictures.
注意:由于astream_events是一个测试版API,可能会根据用户反馈和使用情况进行调整,因此在使用时需要注意可能的更新和变化。同时,为了确保所有回调都能正常工作,建议在应用中使用异步代码,并避免混用同步版本的工具。
在LangChain中,如果你的工具需要使用LangChain的可运行对象(例如LLMs、检索器等),并且希望从这些对象中流式传输事件,那么需要确保回调(callbacks)被正确传递。例如下面的代码演示了在工具内部实现流式传输事件的过程。
- @tool
- async def get_items(place: str, callbacks: Callbacks) -> str:
- ""“使用此工具查询给定地点可能存放的物品。”""
- # 创建一个提示模板并配置LLM
- template = ChatPromptTemplate.from_messages([
- ("human", f"你能告诉我在'{place}'这个地方可能找到哪些物品吗?")
- ])
- chain = template | model.with_config(
- {"run_name": "Get Items LLM", "tags": ["tool_llm"], "callbacks": callbacks}
- )
- # 异步流式传输链生成的文本块
- chunks = [chunk async for chunk in chain.astream({"place": place})]
- return "".join(chunk.content for chunk in chunks)
-
- # 初始化代理并使用astream_events来监听事件
- async for event in agent_executor.astream_events({"input": "where is the cat hiding?"}, version="v1"):
- # 根据事件类型处理事件
- if event["event"] == "on_chain_start":
- # 代理或工具启动时的逻辑
- elif event["event"] == "on_chain_end":
- # 代理或工具结束时的逻辑
- # ... 其他事件类型的处理逻辑
在工具内部的流式传输事件是一种强大的技术,它允许开发者实时监控和响应LangChain可运行对象的行为。通过传递回调和使用LangChain的流式传输API,可以构建更加动态和交互性强的应用,这对于需要实时反馈或复杂用户界面的应用尤其有用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。