赞
踩
为了尽可能简化创建自定义链的过程,我们实现了一个“Runnable”协议,这是一个标准接口,可以轻松定义自定义链并以标准方式调用它们。
标准接口包括:
stream
:流式返回响应的块invoke
:在输入上调用链batch
:在输入列表上调用链这些方法对应的异步方法:
astream
:异步流式返回响应的块ainvoke
:异步在输入上调用链abatch
:异步在输入列表上调用链astream_log
:异步流式返回中间步骤,以及最终响应astream_events
:beta 异步流式返回链中发生的事件(在 langchain-core
0.1.14 中引入)输入类型和输出类型因组件而异:
组件 | 输入类型 | 输出类型 |
---|---|---|
Prompt | 字典 | PromptValue |
ChatModel | 单个字符串、聊天消息列表或 PromptValue | ChatMessage |
LLM | 单个字符串、聊天消息列表或 PromptValue | 字符串 |
OutputParser | LLM 或 ChatModel 的输出 | 取决于解析器 |
Retriever | 单个字符串 | 文档列表 |
Tool | 单个字符串或字典,取决于工具 | 取决于工具 |
所有可运行对象都公开输入和输出的模式以检查输入和输出:
input_schema
: 从 Runnable 的结构动态生成的输入 Pydantic 模型output_schema
: 从 Runnable 的结构动态生成的输出 Pydantic 模型示例:创建一个简单的PromptTemplate + ChatModel链
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model
Runnable 接受的输入的描述。 这是从任何 Runnable 的结构动态生成的 Pydantic 模型,可以调用 .schema()
来获取其 JSONSchema 表示形式。
# 链的输入模式是其第一个部分(prompt)的输入模式。
chain.input_schema.schema()
prompt.input_schema.schema()
model.input_schema.schema()
对由可运行对象产生的输出的描述。 这是根据任何可运行对象的结构动态生成的 Pydantic 模型,可以调用 .schema()
来获取 JSONSchema 表示形式。
# 链的输出模式是其最后一部分的输出模式,本例中是 ChatModel,它输出一个 ChatMessage
chain.output_schema.schema()
for s in chain.stream({"topic": "bears"}):
print(s.content, end="", flush=True)
输出:
Why don't bears wear shoes?
Because they prefer bear feet!
chain.invoke({"topic": "bears"})
AIMessage(content='Why did the bear bring a flashlight to the party? \n\nBecause he heard it was going to be a "beary" dark and wild night!')
chain.batch([{"topic": "bears"}, {"topic": "cats"}])
[AIMessage(content='Why did the bear bring a flashlight to the party? \nBecause he wanted to be the "light" of the party!'),
AIMessage(content='Why was the cat sitting on the computer?\nBecause it wanted to keep an eye on the mouse!')]
还可以使用 max_concurrency
参数设置并发请求数
chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})
async for s in chain.astream({"topic": "bears"}):
print(s.content, end="", flush=True)
Why did the bear dissolve in water?
Because it was polar!
await chain.ainvoke({"topic": "bears"})
AIMessage(content="Why did the bear break up with his girlfriend?\n\nBecause he couldn't bear the relationship anymore!")
await chain.abatch([{"topic": "bears"}])
[AIMessage(content='Why did the bear dissolve in water?\n\nBecause it was polar!')]
sidebar_position: 1.5
title:异步流事件(beta)
异步流事件(beta)
事件流是一个beta API,可能会根据反馈略微更改。
注意:在 langchain-core 0.2.0 中引入
目前,当使用 astream_events API 时,请确保以下所有内容都能正常工作:
- 在整个代码中尽可能使用
async
(包括异步工具等)- 如果定义自定义函数/运行器,请传递回调。
- 每当使用不是 LCEL 上的运行器时,请确保在 LLM 上调用
.astream()
而不是.ainvoke
以强制 LLM 流式传输令牌。事件参考
下面是一个参考表,显示了各种 Runnable 对象可能发出的一些事件。
表后面包含一些 Runnable 的定义。⚠️ 当流式处理时,输入的可运行对象将在输入流被完全消耗之后才可用。这意味着输入将在对应的
end
钩子而不是start
事件中可用。
事件 名称 块 输入 输出 on_chat_model_start [model name] {“messages”: [[SystemMessage, HumanMessage]]} on_chat_model_stream [model name] AIMessageChunk(content=“hello”) on_chat_model_end [model name] {“messages”: [[SystemMessage, HumanMessage]]} {“generations”: […], “llm_output”: None, …} on_llm_start [model name] {‘input’: ‘hello’} on_llm_stream [model name] ‘Hello’ on_llm_end [model name] ‘Hello human!’ on_chain_start format_docs on_chain_stream format_docs “hello world!, goodbye world!” on_chain_end format_docs [Document(…)] “hello world!, goodbye world!” on_tool_start some_tool {“x”: 1, “y”: “2”} on_tool_stream some_tool {“x”: 1, “y”: “2”} on_tool_end some_tool {“x”: 1, “y”: “2”} on_retriever_start [retriever name] {“query”: “hello”} on_retriever_chunk [retriever name] {documents: […]} on_retriever_end [retriever name] {“query”: “hello”} {documents: […]} on_prompt_start [template_name] {“question”: “hello”} on_prompt_end [template_name] {“question”: “hello”} ChatPromptValue(messages: [SystemMessage, …]) 下面是上述事件相关联的声明:
format_docs
:def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
- 1
- 2
- 3
- 4
- 5
some_tool
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
- 1
- 2
- 3
- 4
prompt
:template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
- 1
- 2
- 3
所有运行器还有一个方法.astream_log()
,用于流式传输(随时发生)链/序列的所有或部分中间步骤。
这对于向用户显示进度、使用中间结果或调试链很有用,可以流式传输所有步骤(默认)或按名称、标记或元数据包含/排除步骤。
此方法产生 JSONPatch操作,按接收到的顺序应用这些操作将重建运行状态。
class LogEntry(TypedDict): id: str """子运行的ID。""" name: str """正在运行的对象的名称。""" type: str """正在运行的对象的类型,例如 prompt、chain、llm 等。""" tags: List[str] """运行的标签列表。""" metadata: Dict[str, Any] """运行的元数据的键值对。""" start_time: str """运行开始时的 ISO-8601 时间戳。""" streamed_output_str: List[str] """此运行流式传输的 LLM 令牌列表(如果适用)。""" final_output: Optional[Any] """此运行的最终输出。 仅在运行成功完成后才可用。""" end_time: Optional[str] """运行结束时的 ISO-8601 时间戳。 仅在运行成功完成后才可用。""" class RunState(TypedDict): id: str """运行的ID。""" streamed_output: List[Any] """由 Runnable.stream() 流式传输的输出块列表。""" final_output: Optional[Any] """运行的最终输出,通常是对 streamed_output 进行聚合(`+`)的结果。 仅在运行成功完成后才可用。""" logs: Dict[str, LogEntry] """运行名称到子运行的映射。如果提供了过滤器,此列表将只包含与过滤器匹配的运行。"""
流式传输JSONPatch块
当使用RunnableParallel
(通常写成字典形式)时,它会并行执行每个元素。
from langchain_core.runnables import RunnableParallel
chain1 = ChatPromptTemplate.from_template("告诉我一个关于{topic}的笑话") | model
chain2 = (
ChatPromptTemplate.from_template("写一首关于{topic}的短诗(2行)")
| model
)
combined = RunnableParallel(joke=chain1, poem=chain2)
%%time
chain1.invoke({"topic": "熊"})
CPU times: total: 31.2 ms
Wall time: 1.12 s
AIMessage(content='为什么熊不喜欢在雨天出去玩?因为它怕变成“湿熊”啦!哈哈哈。')
%%time
chain2.invoke({"topic": "熊"})
CPU times: total: 31.2 ms
Wall time: 1.57 s
AIMessage(content='森林里的熊,毛茸茸又温暖。')
%%time
combined.invoke({"topic": "熊"})
CPU times: total: 62.5 ms
Wall time: 1.25 s
{'joke': AIMessage(content='为什么熊不喜欢和蜜蜂比赛跑步?因为他们总是会被蜜蜂“蜇”在后面! 本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。