赞
踩
LangGraph是一个基于图状态机构建复杂、稳定的AI agent的库。本文介绍LangGraph的核心概念。
尽管可能每个人对AI Agent由什么构成的定义不同,这里将agent看成是利用语言模型来控制工作流(workflow)循环和采取行动的系统。原型的LLM agent使用ReAct式设计,将LLM应用于驱动一个基本循环,具体步骤如下:
虽然LLM agent在这方面效果出人意料,但单纯的agent循环在大规模情况下并不能提供用户期待的可靠性。它们具有美丽的随机性。设计精良的系统充分利用这种随机性,使得该系统能容忍LLM输出中的错误。
AI设计模式应该应用良好工程实践,包括:
LangGraph的StateGraph
抽象支持这些需求,提供比AgentExecutor
框架更低级别的API,可以完全控制何时何地以及如何使用AI。
LangGraph将Agent的工作流建模为状态机,可以使用三个关键组件定义Agent的行为:
State
: 表示应用当前快照的共享数据结构,可以是任何Python类型,通常是TypedDict
或Pydantic
的BaseModel
;Node
: 编码Agent逻辑的Python函数,接收当前状态,执行一些计算或副作用(side-effect),返回更新后的状态;Edge
: 根据当前状态确定下一个要执行的节点的控制流规则,可以是条件分支或固定转换;通过组合节点和边,可以创建随时间演变状态的复杂循环工作流,LangGraph的强大在于它如何管理这些状态。
LangGraph的底层图算法使用消息传递来定义通信程序,当一个节点完成时,它沿着一个或多个边发送消息给其他节点。这些节点运行它们的函数,将结果消息传递给下一组节点,以此类推。受Pregel启发,程序按离散的超步(super-step)进行,这些超步在概念上都是并行执行。
当运行图时,所有的节点都处于非活动(inactive)状态,每当入边(incoming edge/channel)接收到新消息(状态)时,节点变为活动(active)状态。运行函数,并响应更新。在每个超步结束时,每个节点都会通过将自身标记为非活动状态来投票停止(vote to halt)。如果没有更多的消息传入,当所有节点都处于非活动状态且无消息在传输时图终止。
节点通常是Python函数,第一个位置参数都是状态,第二个可选的位置参数是config
,包含配置参数(例如thread_id
)。使用add_node
方法将这些节点添加到图中。
from langchain_core.runnables import RunnableConfig from langgraph.graph import END, START, StateGraph builder = StateGraph(dict) def my_node(state: dict, config: RunnableConfig): print("In node: ", config["configurable"]["user_id"]) return {"results": f"Hello, {state['input']}!"} # The second argument is optional def my_other_node(state: dict): return state builder.add_node("my_node", my_node) builder.add_node("other_node", my_other_node) builder.add_edge(START, "my_node") builder.add_edge("my_node", "other_node") builder.add_edge("other_node", END) graph = builder.compile() graph.invoke({"input": "Will"}, {"configurable": {"user_id": "abcd-123"}}) # In node: abcd-123 # {'results': 'Hello, Will!'}
在底层,函数会被转换为RunnableLambda
,它添加了批处理和异步支持,以及跟踪和调试功能。
边定义了逻辑的路由方式和图如何决定停止。类似节点,它们接收图的当前状态并返回一个值。默认该值是要将状态发送到下一个节点或节点的名称。所有的这些节点将作为下一个超步的一部分并行运行。
如果想重用边,可以选择提供一个字典,将边的输出映射到下一个节点的名称。
如果希望始终从节点A到节点B,可以直接使用add_edge
方法。
如果要选择性地路由到一个或多个边(或选择性地终止),可以使用add_conditional_edges
方法。
如果一个节点有多个出边,所有这些目标节点将作为下一个超步的一部分并行执行。
LangGraph引入了状态管理的两个关键概念:状态模式(state schema)和reducer。
状态模式定义了提供给图中每个节点的对象的类型。
Reducer定义了如何将节点输出应用于当前状态。例如,可以使用reducer将心的对话响应合并到对话历史记录中,或将多个Agent节点的输出平均聚合在一起(average together)。
下面通过一个示例来看看reducer的工作原理,比较下面两个状态。
from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import END, START, StateGraph class StateA(TypedDict): value: int builder = StateGraph(StateA) builder.add_node("my_node", lambda state: {"value": 1}) # 更新value为1 builder.add_edge(START, "my_node") builder.add_edge("my_node", END) graph = builder.compile() graph.invoke({"value": 5})
和StateB:
from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import END, START, StateGraph def add(existing: int, new: int): return existing + new class StateB(TypedDict): # 高亮的新行 value: Annotated[int, add] builder = StateGraph(StateB) builder.add_node("my_node", lambda state: {"value": 1}) # 更新为 5 + 1 builder.add_edge(START, "my_node") builder.add_edge("my_node", END) graph = builder.compile() graph.invoke({"value": 5})
在StateA中,结果是1
。因为状态的默认reducer是直接覆盖。在StateB中,结果是6
,因为我们将add
函数创建为reducer,它接收现有状态和状态更新,并返回更新后的值。
虽然我们通常使用TypedDict
作为State的state schema,实际上可以是几乎任何类型,下面的代码也是有效的:
# Analogous to StateA above builder = StateGraph(int) builder.add_node("my_node", lambda state: 1) builder.add_edge(START, "my_node") builder.add_edge("my_node", END) builder.compile().invoke(5) # Analogous to StateB def add(left, right): return left + right builder = StateGraph(Annotated[int, add]) builder.add_node("my_node", lambda state: 1) builder.add_edge(START, "my_node") builder.add_edge("my_node", END) graph = builder.compile() graph.invoke(5)
这意味着可以使用Pydantic BaseModel作为图的状态,可以添加默认值和额外的数据验证。
当构建像ChatGPT这样的聊天机器人时,状态可能仅仅是一个聊天消息列表。这是MessageGraph
(StateGrpah
的轻量包装器)使用的状态,仅比下面的稍微复杂一点:
builder = StateGraph(Annotated[list, add])
在图中使用共享状态涉及一些设计的权衡。共享一个类型化状态提供了很多与构建AI工作流相关的优势,包括:
update_state
)变得容易;任何智能系统都需要记忆才能运作。AI智能体也是一样,需要跨一个或多个时间范围(timeframe)的记忆:
最后一种记忆形式涵盖了很多内容(个性化、优化、持续学习等),超出了本次的内容。
前两种记忆形式通过基于检查点的StateGraph的API来支持。
检查点(checkpoint)代表应用程序和用户之间进行的多轮互动中线程的状态。在单次运行中创建的检查点将具有一组在从该状态开始时执行的下一个节点。在给定运行结束时创建的检查点是相同的,只是没有下一个节点可以转换(正在等待用户输入)。
检查点支持聊天记忆等功能,可以tag并持久化系统中已经采取的每个状态。
Agent的每一步都被设为检查点,在代理未能实现你的目标而遇到错误的情况下,可以随时从其中一个保存的检查点恢复它的任务。
这可以支持human-in-the-loop工作流,在执行给定节点之前或之后,可以中断图的执行将控制权交给用户,这个用户可以立即回复,也可以之后回复。你的工作流都可以随时恢复。
检查点保存在一个thread_id
下,来支持用户和系统之间的多轮交互。在如何配置图以添加多轮记忆支持方面没有任何区别,因为检查点工作在整个过程中都是相同的。
如果要在多轮对话中保留一部分状态并将一些状态视为"短暂的",你可以在图的最终节点中清除相关状态。
使用检查点就像调用compile(checkpointer=my_checkpointer)
一样简单,然后在其可配置参数中使用一个thread_id
来调用它。
线程表示图的不同会话,它们将状态检查点组织在离散会话中,以便在应用中支持多用户对话。
一个典型的聊天机器人应用为每个用户创建了多个线程,每个线程代表一次对话,都具有自己的持久化的聊天记录和其他状态。线程内的检查点可以根据需要进行回放和分支。
当一个StateGraph
基于checkpointer
编译,每次调用图时都需要通过配置(configuration)提供一个thread_id
。
对于任何给定的图部署,你可能希望有一些可在运行时控制的可配置值。这些与图输入不同,因为它们不是要视为状态变量。
一个常见的例子是对话线程thread_id
、用户user_id
、选择使用哪个LLM、在检索器中返回多少个文档等。虽然你可以将这些值传递到状态中,但最好将其与常规数据流分开。
我们来看一个例子,看多轮记忆是如何工作的。
from typing import Annotated from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph def add(left, right): return left + right class State(TypedDict): total: Annotated[int, add] turn: str builder = StateGraph(State) # 不存在检查点 builder.add_node("add_one", lambda x: {"total": 1}) # 默认初始或增加1 builder.add_edge(START, "add_one") # 进入add_one builder.add_edge("add_one", END) # 结束 memory = MemorySaver() graph = builder.compile(checkpointer=memory) # 基于checkpointer编译 thread_id = "some-thread" config = {"configurable": {"thread_id": thread_id}} # 配置thread_id result = graph.invoke({"total": 1, "turn": "First Turn"}, config) # 第一次运行,累加到2 result2 = graph.invoke({"turn": "Next Turn"}, config) # 累加到3,默认传入total=1 result3 = graph.invoke({"total": 5}, config) # 累加 5+1,变成9 result4 = graph.invoke({"total": 5}, {"configurable": {"thread_id": "new-thread-id"}}) # 累加到6,因为是新的对话,从1开始累加
对于第一次运行,不存在检查点,因此图是在原始输入上运行的。total
值从1增加到2,turn
设置为First Turn
。
对于第二次运行,用户更新了turn
,但没有更新total
!由于我们是从状态中加载的,先前的结果增加了1(在add_one
节点中),然后turn
被用户覆盖。
对于第三次运行,turn
保持不变,因为它是从检查点加载的,而没有被用户覆盖。total
增加了用户提供的值,因为这个值是通过add
函数reduce(更新)的。
对于第四次运行,使用了一个新的线程id,但没有找到检查点,所以结果仅仅是默认的total
增加1
。
这种面向用户的行为等同于没有检查点情况下运行以下内容:
graph = builder.compile()
result = graph.invoke({"total": 1, "turn": "First Turn"})
result2 = graph.invoke({**result, "turn": "Next Turn"})
result3 = graph.invoke({**result2, "total": result2["total"] + 5})
result4 = graph.invoke({"total": 5})
下面我们看复杂一点的例子,通过在上面的玩具示例中添加一个条件边。
from typing import Annotated, Literal from typing_extensions import TypedDict from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, START, StateGraph def add(left, right): return left + right class State(TypedDict): total: Annotated[int, add] builder = StateGraph(State) builder.add_node("add_one", lambda x: {"total": 1}) # 新增1 builder.add_node("double", lambda x: {"total": x["total"]}) # 新增现在的x值,即翻倍 builder.add_edge(START, "add_one") # 定义一个路由 route -> double 也可能 route -> end def route(state: State) -> Literal["double", "__end__"]: if state["total"] < 6: return "double" # 路由到double return "__end__" # 结束 builder.add_conditional_edges("add_one", route) # add_one -> route builder.add_edge("double", "add_one") # double -> add_one memory = MemorySaver() graph = builder.compile(checkpointer=memory)
然后第一次调用:
thread_id = "some-thread"
config = {"configurable": {"thread_id": thread_id}}
for step in graph.stream({"total": 1}, config, stream_mode="debug"):
print(step["step"], step["type"], step["payload"].get("values"))
# 0 checkpoint {'total': 1} 将输入的1增加到初始值0中,得到1
# 1 checkpoint {'total': 2} 进入 add_one ,新增了1
# 2 checkpoint {'total': 4} 进入route,到double,翻倍
# 3 checkpoint {'total': 5} double返回到add_one,新增了1
# 4 checkpoint {'total': 10} 进入route,到double,翻倍
# 5 checkpoint {'total': 11} double返回到add_one,新增了1,然后进入route到end
下面详细介绍执行过程:
1
加到现有值0
中。 在这一超步结束时,总量(total)为1
。add_one
节点,返回1
。1
)中。 状态现在是2
。route
,由于值小于6,继续到double
节点。doube
输出现有的总量2
并返回。 然后将其加到现有状态中。 状态现在是4
。add_one
返回(5
),检查条件边并继续进行,因为它小于6
。double
后,总量为10
。add_one
,总量为11
,检查条件边,由于大于6,程序终止。第二轮调用,我们使用同样的配置:
for step in graph.stream(
{"total": -2, "turn": "First Turn"}, config, stream_mode="debug"
):
print(step["step"], step["type"], step["payload"].get("values"))
# 7 checkpoint {'total': 9} 输入为-2,11-2=9
# 8 checkpoint {'total': 10} 进入add_one,增加了1,变成10
add
reducer 将总量从0
更改为-2
。9=-2+11
。add_one
节点以此状态被调用。 它返回10
。10
。route
,由于值大于6
,终止程序。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。