当前位置:   article > 正文

构建LangChain应用程序的示例代码:36、基于LangGraph的检索代理实现教程_langchain代码示例

langchain代码示例

LangGraph 检索代理

我们可以在 LangGraph 中实现 检索代理

! pip install langchain_community tiktoken langchain-openai langchainhub chromadb langchain langgraph
  • 1

检索器

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# 需要加载的网页URL列表
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]

# 从URL加载文档
docs = [WebBaseLoader(url).load() for url in urls]
# 将文档展平成单个列表
docs_list = [item for sublist in docs for item in sublist]

# 使用递归字符文本拆分器进行文本拆分
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=100,  # 每个文本块的大小
    chunk_overlap=50  # 文本块之间的重叠部分大小
)
doc_splits = text_splitter.split_documents(docs_list)

# 添加到向量数据库
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",  # 向量数据库集合名称
    embedding=OpenAIEmbeddings(),  # 使用OpenAI的嵌入模型
)
# 将向量数据库作为检索器
retriever = vectorstore.as_retriever()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
from langchain.tools.retriever import create_retriever_tool

# 创建检索工具
tool = create_retriever_tool(
    retriever,
    "retrieve_blog_posts",  # 工具名称
    "Search and return information about Lilian Weng blog posts.",  # 工具描述
)

tools = [tool]

from langgraph.prebuilt import ToolExecutor
# 创建工具执行器
tool_executor = ToolExecutor(tools)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

代理状态

我们将定义一个图。

一个state对象会被传递给每个节点。

我们的状态将是一个messages列表。

图中的每个节点都会向它附加信息。

import operator
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage

# 定义代理状态类,包含消息列表
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

节点和边

每个节点将 -
1/ 要么是一个函数,要么是一个可运行的。
2/ 修改状态。
边选择下一个要调用的节点。
我们可以像这样布置一个代理 RAG 图:
在这里插入图片描述

import json
import operator
from typing import Annotated, Sequence, TypedDict

from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain.tools.render import format_tool_to_openai_function
from langchain_core.messages import BaseMessage, FunctionMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolInvocation

### Edges


def should_retrieve(state):
    """
    Decides whether the agent should retrieve more information or end the process.

    This function checks the last message in the state for a function call. If a function call is
    present, the process continues to retrieve information. Otherwise, it ends the process.

    Args:
        state (messages): The current state of the agent, including all messages.

    Returns:
        str: A decision to either "continue" the retrieval process or "end" it.
    """
    print("---DECIDE TO RETRIEVE---")
    messages = state["messages"]
    last_message = messages[-1]
    # If there is no function call, then we finish
    if "function_call" not in last_message.additional_kwargs:
        print("---DECISION: DO NOT RETRIEVE / DONE---")
        return "end"
    # Otherwise there is a function call, so we continue
    else:
        print("---DECISION: RETRIEVE---")
        return "continue"


def check_relevance(state):
    """
    Determines whether the Agent should continue based on the relevance of retrieved documents.

    This function checks if the last message in the conversation is of type FunctionMessage, indicating
    that document retrieval has been performed. It then evaluates the relevance of these documents to the user's
    initial question using a predefined model and output parser. If the documents are relevant, the conversation
    is considered complete. Otherwise, the retrieval process is continued.

    Args:
        state messages: The current state of the conversation, including all messages.

    Returns:
        str: A directive to either "end" the conversation if relevant documents are found, or "continue" the retrieval process.
    """

    print("---CHECK RELEVANCE---")

    # Output
    class FunctionOutput(BaseModel):
        binary_score: str = Field(description="Relevance score 'yes' or 'no'")

    # Create an instance of the PydanticOutputParser
    parser = PydanticOutputParser(pydantic_object=FunctionOutput)

    # Get the format instructions from the output parser
    format_instructions = parser.get_format_instructions()

    # Create a prompt template with format instructions and the query
    prompt = PromptTemplate(
        template="""You are a grader assessing relevance of retrieved docs to a user question. \n 
        Here are the retrieved docs:
        \n ------- \n
        {context} 
        \n ------- \n
        Here is the user question: {question}
        If the docs contain keyword(s) in the user question, then score them as relevant. \n
        Give a binary score 'yes' or 'no' score to indicate whether the docs are relevant to the question. \n 
        Output format instructions: \n {format_instructions}""",
        input_variables=["question"],
        partial_variables={"format_instructions": format_instructions},
    )

    model = ChatOpenAI(temperature=0, model="gpt-4-0125-preview")

    chain = prompt | model | parser

    messages = state["messages"]
    last_message = messages[-1]
    score = chain.invoke(
        {"question": messages[0].content, "context": last_message.content}
    )

    # If relevant
    if score.binary_score == "yes":
        print("---DECISION: DOCS RELEVANT---")
        return "yes"

    else:
        print("---DECISION: DOCS NOT RELEVANT---")
        print(score.binary_score)
        return "no"


### Nodes


# Define the function that calls the model
def call_model(state):
    """
    Invokes the agent model to generate a response based on the current state.

    This function calls the agent model to generate a response to the current conversation state.
    The response is added to the state's messages.

    Args:
        state (messages): The current state of the agent, including all messages.

    Returns:
        dict: The updated state with the new message added to the list of messages.
    """
    print("---CALL AGENT---")
    messages = state["messages"]
    model = ChatOpenAI(temperature=0, streaming=True, model="gpt-4-0125-preview")
    functions = [format_tool_to_openai_function(t) for t in tools]
    model = model.bind_functions(functions)
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}


# Define the function to execute tools
def call_tool(state):
    """
    Executes a tool based on the last message's function call.

    This function is responsible for executing a tool invocation based on the function call
    specified in the last message. The result from the tool execution is added to the conversation
    state as a new message.

    Args:
        state (messages): The current state of the agent, including all messages.

    Returns:
        dict: The updated state with the new function message added to the list of messages.
    """
    print("---EXECUTE RETRIEVAL---")
    messages = state["messages"]
    # Based on the continue condition
    # we know the last message involves a function call
    last_message = messages[-1]
    # We construct an ToolInvocation from the function_call
    action = ToolInvocation(
        tool=last_message.additional_kwargs["function_call"]["name"],
        tool_input=json.loads(
            last_message.additional_kwargs["function_call"]["arguments"]
        ),
    )
    # We call the tool_executor and get back a response
    response = tool_executor.invoke(action)
    # print(type(response))
    # We use the response to create a FunctionMessage
    function_message = FunctionMessage(content=str(response), name=action.tool)

    # We return a list, because this will get added to the existing list
    return {"messages": [function_message]}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167

图形

  • 从代理 call_model 开始
  • 代理做出调用函数的决定
  • 如果是这样,则调用工具(检索器)的操作
  • 然后调用代理,并将工具输出添加到消息(状态)
from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(AgentState)

# Define the nodes we will cycle between
workflow.add_node("agent", call_model)  # agent
workflow.add_node("action", call_tool)  # retrieval
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
# Call agent node to decide to retrieve or not
workflow.set_entry_point("agent")

# Decide whether to retrieve
workflow.add_conditional_edges(
    "agent",
    # Assess agent decision
    should_retrieve,
    {
        # Call tool node
        "continue": "action",
        "end": END,
    },
)

# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
    "action",
    # Assess agent decision
    check_relevance,
    {
        # Call agent node
        "yes": "agent",
        "no": END,  # placeholder
    },
)

# Compile
app = workflow.compile()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
import pprint

from langchain_core.messages import HumanMessage

inputs = {
    "messages": [
        HumanMessage(
            content="What are the types of agent memory based on Lilian Weng's blog post?"
        )
    ]
}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint.pprint(f"Output from node '{key}':")
        pprint.pprint("---")
        pprint.pprint(value, indent=2, width=80, depth=None)
    pprint.pprint("\n---\n")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

知识点扩展

LangChain

LangChain 是一个用于构建基于大型语言模型(LLM)的应用的框架。它提供了与各种数据源和工具的集成能力,并简化了创建复杂的对话代理和信息检索系统的过程。

LangGraph

LangGraph 是基于 LangChain 构建的一个扩展库,用于创建和操作图形结构的代理系统。它允许开发者定义状态和图节点,使得数据处理和代理逻辑更加灵活和模块化。

OpenAI 嵌入

OpenAI Embeddings 是一种通过神经网络将文本转换为高维向量表示的方法。这些向量可以用来衡量文本之间的相似度,从而实现高效的信息检索和分类。

Chroma

Chroma 是一个向量数据库,用于存储和检索高维向量表示的数据。它支持高效的相似度搜索,并与多种嵌入模型兼容。

递归字符文本拆分器

递归字符文本拆分器是一种文本预处理工具,用于将长文档拆分成较小的文本块。它在保持上下文连续性的同时,提高了处理长文本的效率。

总结

本文介绍了如何使用 LangGraph 和 LangChain 构建一个检索代理系统。通过定义文档加载器、文本拆分器和向量数据库,我们实现了一个能够从指定网页中检索信息的代理工具。该系统展示了大型语言模型在信息检索领域的强大能力,同时也提供了一个灵活的框架,便于扩展和定制。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/965727
推荐阅读
相关标签
  

闽ICP备14008679号