赞
踩
! pip install langchain_community tiktoken langchain-openai langchainhub chromadb langchain langgraph
from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings # 需要加载的网页URL列表 urls = [ "https://lilianweng.github.io/posts/2023-06-23-agent/", "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/", "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/", ] # 从URL加载文档 docs = [WebBaseLoader(url).load() for url in urls] # 将文档展平成单个列表 docs_list = [item for sublist in docs for item in sublist] # 使用递归字符文本拆分器进行文本拆分 text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=100, # 每个文本块的大小 chunk_overlap=50 # 文本块之间的重叠部分大小 ) doc_splits = text_splitter.split_documents(docs_list) # 添加到向量数据库 vectorstore = Chroma.from_documents( documents=doc_splits, collection_name="rag-chroma", # 向量数据库集合名称 embedding=OpenAIEmbeddings(), # 使用OpenAI的嵌入模型 ) # 将向量数据库作为检索器 retriever = vectorstore.as_retriever()
from langchain.tools.retriever import create_retriever_tool
# 创建检索工具
tool = create_retriever_tool(
retriever,
"retrieve_blog_posts", # 工具名称
"Search and return information about Lilian Weng blog posts.", # 工具描述
)
tools = [tool]
from langgraph.prebuilt import ToolExecutor
# 创建工具执行器
tool_executor = ToolExecutor(tools)
我们将定义一个图。
一个state
对象会被传递给每个节点。
我们的状态将是一个messages
列表。
图中的每个节点都会向它附加信息。
import operator
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
# 定义代理状态类,包含消息列表
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
每个节点将 -
1/ 要么是一个函数,要么是一个可运行的。
2/ 修改状态。
边选择下一个要调用的节点。
我们可以像这样布置一个代理 RAG 图:
import json import operator from typing import Annotated, Sequence, TypedDict from langchain.output_parsers import PydanticOutputParser from langchain.prompts import PromptTemplate from langchain.tools.render import format_tool_to_openai_function from langchain_core.messages import BaseMessage, FunctionMessage from langchain_core.pydantic_v1 import BaseModel, Field from langchain_openai import ChatOpenAI from langgraph.prebuilt import ToolInvocation ### Edges def should_retrieve(state): """ Decides whether the agent should retrieve more information or end the process. This function checks the last message in the state for a function call. If a function call is present, the process continues to retrieve information. Otherwise, it ends the process. Args: state (messages): The current state of the agent, including all messages. Returns: str: A decision to either "continue" the retrieval process or "end" it. """ print("---DECIDE TO RETRIEVE---") messages = state["messages"] last_message = messages[-1] # If there is no function call, then we finish if "function_call" not in last_message.additional_kwargs: print("---DECISION: DO NOT RETRIEVE / DONE---") return "end" # Otherwise there is a function call, so we continue else: print("---DECISION: RETRIEVE---") return "continue" def check_relevance(state): """ Determines whether the Agent should continue based on the relevance of retrieved documents. This function checks if the last message in the conversation is of type FunctionMessage, indicating that document retrieval has been performed. It then evaluates the relevance of these documents to the user's initial question using a predefined model and output parser. If the documents are relevant, the conversation is considered complete. Otherwise, the retrieval process is continued. Args: state messages: The current state of the conversation, including all messages. Returns: str: A directive to either "end" the conversation if relevant documents are found, or "continue" the retrieval process. """ print("---CHECK RELEVANCE---") # Output class FunctionOutput(BaseModel): binary_score: str = Field(description="Relevance score 'yes' or 'no'") # Create an instance of the PydanticOutputParser parser = PydanticOutputParser(pydantic_object=FunctionOutput) # Get the format instructions from the output parser format_instructions = parser.get_format_instructions() # Create a prompt template with format instructions and the query prompt = PromptTemplate( template="""You are a grader assessing relevance of retrieved docs to a user question. \n Here are the retrieved docs: \n ------- \n {context} \n ------- \n Here is the user question: {question} If the docs contain keyword(s) in the user question, then score them as relevant. \n Give a binary score 'yes' or 'no' score to indicate whether the docs are relevant to the question. \n Output format instructions: \n {format_instructions}""", input_variables=["question"], partial_variables={"format_instructions": format_instructions}, ) model = ChatOpenAI(temperature=0, model="gpt-4-0125-preview") chain = prompt | model | parser messages = state["messages"] last_message = messages[-1] score = chain.invoke( {"question": messages[0].content, "context": last_message.content} ) # If relevant if score.binary_score == "yes": print("---DECISION: DOCS RELEVANT---") return "yes" else: print("---DECISION: DOCS NOT RELEVANT---") print(score.binary_score) return "no" ### Nodes # Define the function that calls the model def call_model(state): """ Invokes the agent model to generate a response based on the current state. This function calls the agent model to generate a response to the current conversation state. The response is added to the state's messages. Args: state (messages): The current state of the agent, including all messages. Returns: dict: The updated state with the new message added to the list of messages. """ print("---CALL AGENT---") messages = state["messages"] model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-0125-preview") functions = [format_tool_to_openai_function(t) for t in tools] model = model.bind_functions(functions) response = model.invoke(messages) # We return a list, because this will get added to the existing list return {"messages": [response]} # Define the function to execute tools def call_tool(state): """ Executes a tool based on the last message's function call. This function is responsible for executing a tool invocation based on the function call specified in the last message. The result from the tool execution is added to the conversation state as a new message. Args: state (messages): The current state of the agent, including all messages. Returns: dict: The updated state with the new function message added to the list of messages. """ print("---EXECUTE RETRIEVAL---") messages = state["messages"] # Based on the continue condition # we know the last message involves a function call last_message = messages[-1] # We construct an ToolInvocation from the function_call action = ToolInvocation( tool=last_message.additional_kwargs["function_call"]["name"], tool_input=json.loads( last_message.additional_kwargs["function_call"]["arguments"] ), ) # We call the tool_executor and get back a response response = tool_executor.invoke(action) # print(type(response)) # We use the response to create a FunctionMessage function_message = FunctionMessage(content=str(response), name=action.tool) # We return a list, because this will get added to the existing list return {"messages": [function_message]}
from langgraph.graph import END, StateGraph
# Define a new graph
workflow = StateGraph(AgentState)
# Define the nodes we will cycle between
workflow.add_node("agent", call_model) # agent
workflow.add_node("action", call_tool) # retrieval
# Call agent node to decide to retrieve or not workflow.set_entry_point("agent") # Decide whether to retrieve workflow.add_conditional_edges( "agent", # Assess agent decision should_retrieve, { # Call tool node "continue": "action", "end": END, }, ) # Edges taken after the `action` node is called. workflow.add_conditional_edges( "action", # Assess agent decision check_relevance, { # Call agent node "yes": "agent", "no": END, # placeholder }, ) # Compile app = workflow.compile()
import pprint from langchain_core.messages import HumanMessage inputs = { "messages": [ HumanMessage( content="What are the types of agent memory based on Lilian Weng's blog post?" ) ] } for output in app.stream(inputs): for key, value in output.items(): pprint.pprint(f"Output from node '{key}':") pprint.pprint("---") pprint.pprint(value, indent=2, width=80, depth=None) pprint.pprint("\n---\n")
LangChain 是一个用于构建基于大型语言模型(LLM)的应用的框架。它提供了与各种数据源和工具的集成能力,并简化了创建复杂的对话代理和信息检索系统的过程。
LangGraph 是基于 LangChain 构建的一个扩展库,用于创建和操作图形结构的代理系统。它允许开发者定义状态和图节点,使得数据处理和代理逻辑更加灵活和模块化。
OpenAI Embeddings 是一种通过神经网络将文本转换为高维向量表示的方法。这些向量可以用来衡量文本之间的相似度,从而实现高效的信息检索和分类。
Chroma 是一个向量数据库,用于存储和检索高维向量表示的数据。它支持高效的相似度搜索,并与多种嵌入模型兼容。
递归字符文本拆分器是一种文本预处理工具,用于将长文档拆分成较小的文本块。它在保持上下文连续性的同时,提高了处理长文本的效率。
本文介绍了如何使用 LangGraph 和 LangChain 构建一个检索代理系统。通过定义文档加载器、文本拆分器和向量数据库,我们实现了一个能够从指定网页中检索信息的代理工具。该系统展示了大型语言模型在信息检索领域的强大能力,同时也提供了一个灵活的框架,便于扩展和定制。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。