当前位置:   article > 正文

LangChain表达式LCEL(三)_langchain stream

langchain stream

接口(Interface)

为了尽可能简化创建自定义链的过程,我们实现了一个“Runnable”协议,这是一个标准接口,可以轻松定义自定义链并以标准方式调用它们。

标准接口包括:

  • stream:流式返回响应的块
  • invoke:在输入上调用链
  • batch:在输入列表上调用链

这些方法对应的异步方法

  • astream:异步流式返回响应的块
  • ainvoke:异步在输入上调用链
  • abatch:异步在输入列表上调用链
  • astream_log:异步流式返回中间步骤,以及最终响应
  • astream_eventsbeta 异步流式返回链中发生的事件(在 langchain-core 0.1.14 中引入)

输入类型输出类型因组件而异:

组件输入类型输出类型
Prompt字典PromptValue
ChatModel单个字符串、聊天消息列表或 PromptValueChatMessage
LLM单个字符串、聊天消息列表或 PromptValue字符串
OutputParserLLM 或 ChatModel 的输出取决于解析器
Retriever单个字符串文档列表
Tool单个字符串或字典,取决于工具取决于工具

所有可运行对象都公开输入和输出的模式以检查输入和输出:

  • input_schema: 从 Runnable 的结构动态生成的输入 Pydantic 模型
  • output_schema: 从 Runnable 的结构动态生成的输出 Pydantic 模型

示例:创建一个简单的PromptTemplate + ChatModel链

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

输入模式

Runnable 接受的输入的描述。 这是从任何 Runnable 的结构动态生成的 Pydantic 模型,可以调用 .schema() 来获取其 JSONSchema 表示形式。

# 链的输入模式是其第一个部分(prompt)的输入模式。
chain.input_schema.schema()

prompt.input_schema.schema()

model.input_schema.schema()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

输出模式

对由可运行对象产生的输出的描述。 这是根据任何可运行对象的结构动态生成的 Pydantic 模型,可以调用 .schema() 来获取 JSONSchema 表示形式。

# 链的输出模式是其最后一部分的输出模式,本例中是 ChatModel,它输出一个 ChatMessage
chain.output_schema.schema()
  • 1
  • 2

Stream

for s in chain.stream({"topic": "bears"}):
    print(s.content, end="", flush=True)
  • 1
  • 2

输出:

Why don't bears wear shoes?

Because they prefer bear feet!
  • 1
  • 2
  • 3

Invoke

chain.invoke({"topic": "bears"})
  • 1
AIMessage(content='Why did the bear bring a flashlight to the party? \n\nBecause he heard it was going to be a "beary" dark and wild night!')
  • 1

Batch

chain.batch([{"topic": "bears"}, {"topic": "cats"}])
  • 1
[AIMessage(content='Why did the bear bring a flashlight to the party? \nBecause he wanted to be the "light" of the party!'),
 AIMessage(content='Why was the cat sitting on the computer?\nBecause it wanted to keep an eye on the mouse!')]
  • 1
  • 2

还可以使用 max_concurrency 参数设置并发请求数

chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})
  • 1

Async Stream

async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)
  • 1
  • 2
Why did the bear dissolve in water?

Because it was polar!
  • 1
  • 2
  • 3

Async Invoke

await chain.ainvoke({"topic": "bears"})
  • 1
AIMessage(content="Why did the bear break up with his girlfriend?\n\nBecause he couldn't bear the relationship anymore!")
  • 1

Async Batch

await chain.abatch([{"topic": "bears"}])
  • 1
[AIMessage(content='Why did the bear dissolve in water?\n\nBecause it was polar!')]
  • 1

sidebar_position: 1.5

title:异步流事件(beta)

异步流事件(beta)

事件流是一个beta API,可能会根据反馈略微更改。

注意:在 langchain-core 0.2.0 中引入

目前,当使用 astream_events API 时,请确保以下所有内容都能正常工作:

  • 在整个代码中尽可能使用async(包括异步工具等)
  • 如果定义自定义函数/运行器,请传递回调。
  • 每当使用不是 LCEL 上的运行器时,请确保在 LLM 上调用.astream() 而不是.ainvoke 以强制 LLM 流式传输令牌。

事件参考

下面是一个参考表,显示了各种 Runnable 对象可能发出的一些事件。
表后面包含一些 Runnable 的定义。

⚠️ 当流式处理时,输入的可运行对象将在输入流被完全消耗之后才可用。这意味着输入将在对应的end钩子而不是start事件中可用。

事件名称输入输出
on_chat_model_start[model name]{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream[model name]AIMessageChunk(content=“hello”)
on_chat_model_end[model name]{“messages”: [[SystemMessage, HumanMessage]]}{“generations”: […], “llm_output”: None, …}
on_llm_start[model name]{‘input’: ‘hello’}
on_llm_stream[model name]‘Hello’
on_llm_end[model name]‘Hello human!’
on_chain_startformat_docs
on_chain_streamformat_docs“hello world!, goodbye world!”
on_chain_endformat_docs[Document(…)]“hello world!, goodbye world!”
on_tool_startsome_tool{“x”: 1, “y”: “2”}
on_tool_streamsome_tool{“x”: 1, “y”: “2”}
on_tool_endsome_tool{“x”: 1, “y”: “2”}
on_retriever_start[retriever name]{“query”: “hello”}
on_retriever_chunk[retriever name]{documents: […]}
on_retriever_end[retriever name]{“query”: “hello”}{documents: […]}
on_prompt_start[template_name]{“question”: “hello”}
on_prompt_end[template_name]{“question”: “hello”}ChatPromptValue(messages: [SystemMessage, …])

下面是上述事件相关联的声明:

format_docs

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)
  • 1
  • 2
  • 3
  • 4
  • 5

some_tool

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}
  • 1
  • 2
  • 3
  • 4

prompt

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
  • 1
  • 2
  • 3

异步流中间步骤

所有运行器还有一个方法.astream_log(),用于流式传输(随时发生)链/序列的所有或部分中间步骤。

这对于向用户显示进度、使用中间结果或调试链很有用,可以流式传输所有步骤(默认)或按名称、标记或元数据包含/排除步骤。

此方法产生 JSONPatch操作,按接收到的顺序应用这些操作将重建运行状态。

class LogEntry(TypedDict):
    id: str
    """子运行的ID。"""
    name: str
    """正在运行的对象的名称。"""
    type: str
    """正在运行的对象的类型,例如 prompt、chain、llm 等。"""
    tags: List[str]
    """运行的标签列表。"""
    metadata: Dict[str, Any]
    """运行的元数据的键值对。"""
    start_time: str
    """运行开始时的 ISO-8601 时间戳。"""

    streamed_output_str: List[str]
    """此运行流式传输的 LLM 令牌列表(如果适用)。"""
    final_output: Optional[Any]
    """此运行的最终输出。
    仅在运行成功完成后才可用。"""
    end_time: Optional[str]
    """运行结束时的 ISO-8601 时间戳。
    仅在运行成功完成后才可用。"""


class RunState(TypedDict):
    id: str
    """运行的ID。"""
    streamed_output: List[Any]
    """由 Runnable.stream() 流式传输的输出块列表。"""
    final_output: Optional[Any]
    """运行的最终输出,通常是对 streamed_output 进行聚合(`+`)的结果。
    仅在运行成功完成后才可用。"""

    logs: Dict[str, LogEntry]
    """运行名称到子运行的映射。如果提供了过滤器,此列表将只包含与过滤器匹配的运行。"""
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

流式传输JSONPatch块

并行处理

当使用RunnableParallel(通常写成字典形式)时,它会并行执行每个元素。

from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("告诉我一个关于{topic}的笑话") | model
chain2 = (
    ChatPromptTemplate.from_template("写一首关于{topic}的短诗(2行)")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
%%time
chain1.invoke({"topic": "熊"})
  • 1
  • 2
CPU times: total: 31.2 ms
Wall time: 1.12 s
AIMessage(content='为什么熊不喜欢在雨天出去玩?因为它怕变成“湿熊”啦!哈哈哈。')
  • 1
  • 2
  • 3
%%time
chain2.invoke({"topic": "熊"})
  • 1
  • 2
CPU times: total: 31.2 ms
Wall time: 1.57 s
AIMessage(content='森林里的熊,毛茸茸又温暖。')
  • 1
  • 2
  • 3
%%time
combined.invoke({"topic": "熊"})
  • 1
  • 2
CPU times: total: 62.5 ms
Wall time: 1.25 s
{'joke': AIMessage(content='为什么熊不喜欢和蜜蜂比赛跑步?因为他们总是会被蜜蜂“蜇”在后面!         本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签