- CH_HOST = 'http://localhost:8123' # default address
- def get_clickhouse_data(query, host = CH_HOST, connection_timeout = 1500):
- r = requests.post(host, params = {'query': query},
- timeout = connection_timeout)
- return r.text
这段代码主要设置了与数据库连接的函数get_clickhouse_data(query),用于与数据库交互。我们将把这个函数的结果传递给LLM。因此,获取任何输出 DB 返回都是可以的,无论它是否是错误。LLM就能妥善处理。
使用到的表结构如下:我们只需要几个表来表示基本电子商务产品的数据模型。我们将使用用户列表 ( ecommerce.users
) 及其会话 ( ecommerce.sessions
会话 ( ecommerce.sessions
用户列表 ( ecommerce.users
) 如下
LLM 代理的核心思想是使用 LLM 作为推理引擎来定义要采取的操作集。在经典方法中,我们对一系列操作进行硬编码,但对于代理,我们为模型提供工具和任务,并让她决定如何实现它们。
事实上,这就是人类智能的运作方式:我们将内心的声音推理与面向任务的行动结合起来。假设您需要做饭。你将使用推理来定义一个计划(“客人将在 30 分钟内到来,我只有时间煮意大利面”),调整它(“本已经成为素食主义者,我应该为他点一些东西”)或决定委托哪些任务相当于外部工具(“意大利面没有了,我需要让我的伙伴去买”)。同时,你将通过行动来使用一些工具(向伙伴寻求帮助或使用搅拌机)或获取一些信息(在互联网上查找你需要煮意大利面多少分钟才能使其有嚼劲)。因此,对LLM使用类似的方法是合理的,因为它适用于人类(毫无疑问是 AGI)。
自 ReAct 以来,LLM 代理有很多不同的方法。它们的不同之处在于用于设置模型推理的提示、我们如何定义工具、输出格式、处理有关中间步骤的内存等。以下是一些流行的方法:
对于第一个版本的 LLM 代理,我们将使用 OpenAI 函数作为框架来构建代理。
装饰器来实现函数。我们将使用 Pydantic 指定每个函数的参数模式,以便模型知道要传递给函数的内容。
- from langchain.agents import tool
- from pydantic import BaseModel, Field
- from typing import Optional
- class SQLQuery(BaseModel):
- query: str = Field(description="SQL query to execute")
- @tool(args_schema = SQLQuery)
- def execute_sql(query: str) -> str:
- """Returns the result of SQL query execution"""
- return get_clickhouse_data(query)
- # 主要功能:查询数据库表信息。不同数据库的系统表不一致,需要根据不同数据库进行设置。
- class SQLTable(BaseModel):
- database: str = Field(description="Database name")
- table: str = Field(description="Table name")
- @tool(args_schema = SQLTable)
- def get_table_columns(database: str, table: str) -> str:
- """Returns list of table column names and types in JSON"""
- q = '''
- select name, type
- from system.columns
- where database = '{database}'
- and table = '{table}'
- format TabSeparatedWithNames
- '''.format(database = database, table = table)
- return str(get_clickhouse_df(q).to_dict('records'))
- # 主要功能:查询表结构:列信息。
- class SQLTableColumn(BaseModel):
- database: str = Field(description="Database name")
- table: str = Field(description="Table name")
- column: str = Field(description="Column name")
- n: Optional[int] = Field(description="Number of rows, default limit 10")
- @tool(args_schema = SQLTableColumn)
- def get_table_column_distr(database: str, table: str, column: str, n:int = 10) -> str:
- """Returns top n values for the column in JSON"""
- q = '''
- select {column}, count(1) as count
- from {database}.{table}
- group by 1
- order by 2 desc
- limit {n}
- format TabSeparatedWithNames
- '''.format(database = database, table = table, column = column, n = n)
- return str(list(get_clickhouse_df(q)[column].values))
我们将使用 OpenAI 函数并需要转换我们的工具。另外,我将我们的工具包保存在字典中。执行工具来获取观察结果时会很方便。
- from langchain.tools.render import format_tool_to_openai_function
- # converting tools into OpenAI functions
- sql_functions = list(map(format_tool_to_openai_function,
- [execute_sql, get_table_columns, get_table_column_distr]))
- # saving tools into a dictionary for the future
- sql_tools = {
- 'execute_sql': execute_sql,
- 'get_table_columns': get_table_columns,
- 'get_table_column_distr': get_table_column_distr
- }
我们已经为该模型创建了工具。现在,我们需要定义Chain。我们将使用最新的 GPT 4 Turbo,它也经过了微调以与这些功能一起使用。让我们初始化一个聊天模型。
- from langchain.chat_models import ChatOpenAI
- llm = ChatOpenAI(temperature=0.1, model = 'gpt-4-1106-preview')\
- .bind(functions = sql_functions)
- from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
- system_message = '''
- You are working as a product analyst for the e-commerce company.
- Your work is very important, since your product team makes decisions based on the data you provide. So, you are extremely accurate with the numbers you provided.
- If you're not sure about the details of the request, you don't provide the answer and ask follow-up questions to have a clear understanding.
- You are very helpful and try your best to answer the questions.
- All the data is stored in SQL Database. Here is the list of tables (in the format <database>.<table>) with descriptions:
- - ecommerce.users - information about the customers, one row - one customer
- - ecommerce.sessions - information about the sessions customers made on our web site, one row - one session
- '''
- analyst_prompt = ChatPromptTemplate.from_messages(
- [
- ("system", system_message),
- ("user", "{question}"),
- MessagesPlaceholder(variable_name="agent_scratchpad"),
- ]
- )
将它们转换为预期的格式。然后,我们将所有内容传递给 LLM,最后,使用输出解析器OpenAIFunctionsAgentOutputParser
- from langchain.agents.format_scratchpad import format_to_openai_function_messages
- from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
- analyst_agent = (
- {
- "question": lambda x: x["question"],
- "agent_scratchpad": lambda x: format_to_openai_function_messages(x["intermediate_steps"]),
- }
- | analyst_prompt
- | llm
- | OpenAIFunctionsAgentOutputParser()
- )
- analyst_agent.invoke({"question": "How many active customers from the United Kingdom do we have?",
- "intermediate_steps": []})
- # AgentActionMessageLog(
- # tool='execute_sql',
- # tool_input={'query': "SELECT COUNT(DISTINCT user_id) AS active_customers_uk FROM ecommerce.sessions WHERE country = 'United Kingdom' AND active = TRUE"},
- # log='\nInvoking: `execute_sql` with `{\'query\': "SELECT COUNT(DISTINCT user_id) AS active_customers_uk FROM ecommerce.sessions WHERE country = \'United Kingdom\' AND active = TRUE"}`\n\n\n',
- # message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"SELECT COUNT(DISTINCT user_id) AS active_customers_uk FROM ecommerce.sessions WHERE country = \'United Kingdom\' AND active = TRUE"}', 'name': 'execute_sql'}})]
- # )
- SELECT COUNT(DISTINCT user_id) AS active_customers_uk
- FROM ecommerce.sessions
- WHERE country = 'United Kingdom' AND active = TRUE
. 看看 LLM 是否能够从这个错误中恢复并返回结果将会很有趣。
- from langchain_core.agents import AgentFinish
- # setting initial parameters
- question = "How many active customers from the United Kingdom do we have?"
- intermediate_steps = []
- num_iters = 0
- while True:
- # breaking if there were more than 10 iterations
- if num_iters >= 10:
- break
- # invoking the agent chain
- output = analyst_agent.invoke(
- {
- "question": question,
- "intermediate_steps": intermediate_steps,
- }
- )
- num_iters += 1
- # returning the final result if we got the AgentFinish object
- if isinstance(output, AgentFinish):
- model_output = output.return_values["output"]
- break
- # calling tool and adding observation to the scratchpad otherwise
- else:
- print(f'Executing tool: {output.tool}, arguments: {output.tool_input}')
- observation = sql_tools[output.tool](output.tool_input)
- print(f'Observation: {observation}')
- print()
- intermediate_steps.append((output, observation))
- print('Model output:', model_output)
我在输出中添加了一些工具使用情况的日志记录,以查看执行情况。此外,您始终可以使用 LangChain 调试模式来查看所有调用。
- Executing tool: execute_sql, arguments: {'query': "SELECT COUNT(*) AS active_customers_uk FROM ecommerce.users WHERE country = 'United Kingdom' AND active = TRUE"}
- Observation: Code: 47. DB::Exception: Missing columns: 'active'
- while processing query: 'SELECT count() AS active_customers_uk
- FROM ecommerce.users WHERE (country = 'United Kingdom') AND (active = true)',
- required columns: 'country' 'active', maybe you meant: 'country'.
- (UNKNOWN_IDENTIFIER) (version (official build))
- Executing tool: get_table_columns, arguments: {'database': 'ecommerce', 'table': 'users'}
- Observation: [{'name': 'user_id', 'type': 'UInt64'}, {'name': 'country', 'type': 'String'},
- {'name': 'is_active', 'type': 'UInt8'}, {'name': 'age', 'type': 'UInt64'}]
- Executing tool: execute_sql, arguments: {'query': "SELECT COUNT(*) AS active_customers_uk FROM ecommerce.users WHERE country = 'United Kingdom' AND is_active = 1"}
- Observation: 111469
- Model output: We have 111,469 active customers from the United Kingdom.
因此,模型尝试执行 SQL,但收到错误:没有列active
我们可以利用模块化框架的优势:我们只需要更改一个参数agent = AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION
- system_message = system_message + '''\n您可以访问以下工具:'''
- agent_kwargs = {
- "prefix" : system_message
- }
- Analyst_agent_react = initialize_agent(
- llm = ChatOpenAI(温度= 0.1 , model = 'gpt-4-1106-preview ' )、
- tools = [execute_sql、get_table_columns、get_table_column_distr]、
- agent_kwargs = agent_kwargs、
- verbose = True、
- max_iterations = 10、
- early_stopping_method = 'generate'
- )
- Analyst_agent_react.run( "我们有多少来自英国的活跃客户?" )
尽管模型遵循不同的路径(从理解表模式开始,然后执行 SQL),但它得到了相同的结果。
此方案灵感来自 BabyAGI 框架和论文“Plan-and-Solve Prompting”,该代理遵循“计划和执行”方法。这种方法的特点是代理首先尝试计划接下来的步骤,然后执行它们。
这种方法的优点是可以进行分离:一个模型专注于计划(推理),而另一个模型专注于执行(行动)。它更加模块化,并且有可能您可以使用针对您的特定任务进行微调的更小、更便宜的模型。然而,这种方法也会生成更多的 LLM 调用,因此如果我们使用 ChatGPT,成本会更高。
- Analyst_agent_plan_and_execute = PlanAndExecute(
- planner=planner,
- executor=executor
- )
- Analyst_agent_plan_and_execute.run( "我们有多少来自英国的活跃客户?" )from langchain_experimental.plan_and_execute import PlanAndExecute, load_agent_executor, load_chat_planner
- model = ChatOpenAI(temperature=0.1, model = 'gpt-4-1106-preview')
- planner = load_chat_planner(model)
- executor = load_agent_executor(model,
- tools = [execute_sql, get_table_columns, get_table_column_distr],
- verbose=True)
- Analyst_agent_plan_and_execute = PlanAndExecute(
- planner=planner,
- executor=executor
- )
- Analyst_agent_plan_and_execute.run( "我们有多少来自英国的活跃客户?" )
调用返回错误:RateLimitError: Error code: 429 — {'error': {'message': 'Request too large for gpt-4–1106-preview in organization on tokens_usage_based per min: Limit 150000, Requested 235832.', 'type': 'tokens_usage_based', 'param': None, 'code': 'rate_limit_exceeded'}}
这种方法使用检索,因此我们需要建立向量存储和嵌入模型。我使用开源轻量级Chroma进行存储和 OpenAI 嵌入。
- from langchain.embeddings.openai import OpenAIEmbeddings
- from langchain.vectorstores import Chroma
- embedding = OpenAIEmbeddings()
- persist_directory = 'vector_store'
- vectordb = Chroma(
- persist_directory=persist_directory,
- embedding_function=embedding
- )
首先,我们将创建一个 TO-DO 链,稍后将其用作执行器的工具。
- from langchain.chains import LLMChain
- from langchain.prompts import PromptTemplate
- todo_prompt_message = '''
- You are a planner who is an expert at coming up with a todo list for
- a given objective. Come up with a todo list for this objective: {objective}
- '''
- todo_prompt = PromptTemplate.from_template(todo_prompt_message)
- todo_chain = LLMChain(llm=OpenAI(temperature=0.1,
- model = 'gpt-4-1106-preview'), prompt=todo_prompt)
- from langchain.agents import AgentExecutor, Tool, ZeroShotAgent
- from langchain.prompts import PromptTemplate
- tools = [
- execute_sql,
- get_table_columns,
- get_table_column_distr,
- Tool(
- name="TODO",
- func=todo_chain.run,
- description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!",
- )
- ]
- prefix = """
- You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.
- You are asked questions related to analytics for e-commerce product.
- Your work is very important, since your product team makes decisions based on the data you provide. So, you are extremely accurate with the numbers you provided.
- If you're not sure about the details of the request, you don't provide the answer and ask follow-up questions to have a clear understanding.
- You are very helpful and try your best to answer the questions.
- All the data is stored in SQL Database. Here is the list of tables (in the format <database>.<table>) with descriptions:
- - ecommerce.users - information about the customers, one row - one customer
- - ecommerce.sessions - information about the sessions customers made on our web site, one row - one session
- """
- suffix = """Question: {task}
- {agent_scratchpad}"""
- prompt = ZeroShotAgent.create_prompt(
- tools,
- prefix=prefix,
- suffix=suffix,
- input_variables=["objective", "task", "context", "agent_scratchpad"],
- )
- llm = OpenAI(temperature=0.1)
- llm_chain = LLMChain(llm=llm, prompt=prompt)
- tool_names = [tool.name for tool in tools]
- analyst_agent_babyagi = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
- analyst_agent_babyagi_executor = AgentExecutor.from_agent_and_tools(
- agent=analyst_agent_babyagi, tools=tools, verbose=True
- )
最后一步是定义 BabyAGI 执行器并运行它。
- from langchain_experimental.autonomous_agents import BabyAGI
- baby_agi = BabyAGI.from_llm(
- llm=llm,
- vectorstore=vectordb,
- task_execution_chain=analyst_agent_babyagi_executor,
- verbose=True,
- max_iterations=10
- )
- baby_agi("Find, how many active customers from the United Kingdom we have.")
该模型决定不使用 TO-DO 函数创建待办事项列表,而是跳转到查询 SQL。但是,第一个查询不正确。该模型尝试恢复并调用get_table_columns函数来获取列名称,但未能遵循架构。它们无法遵循结构。
工具,以便模型可以看到所有可用的表。我希望它能帮助模型编写正确的 SQL 查询。
- @tool()
- def get_tables() -> str:
- """Returns list of tables in the format <database>.<table>"""
- return ['ecommerce.users', 'ecommerce.sessions']
让我们创建一个 AutoGPT 代理,它就像一个函数调用一样简单。然后,让我们执行它并看看它是如何工作的。
- from langchain_experimental.autonomous_agents import AutoGPT
- analyst_agent_autogpt = AutoGPT.from_llm_and_tools(
- ai_name="Harry",
- ai_role="Assistant",
- tools= [execute_sql, get_table_columns,
- get_table_column_distr, get_tables],
- llm=ChatOpenAI(temperature=0.1, model = 'gpt-4-1106-preview'),
- memory=vectordb.as_retriever(),
- )
- analyst_agent_autogpt.chain.verbose = True
- analyst_agent_autogpt.run(["Find how many active customers from the United Kingdom we have."])
该模型能够得出正确的答案:“来自英国的活跃客户数量为 111469。”
阅读提示很有趣,因为我们使用了默认提示。您可以通过 访问它analyst_agent_autogpt.chain.prompt
- {
- "thoughts": {
- "text": "To find the number of active customers from the United Kingdom, I need to identify the relevant database and table that contains customer information. Once I have that, I can look for columns that might indicate customer activity and country. The first step is to list all the available tables.",
- "reasoning": "Understanding the structure of the database is essential before I can execute a query to find the active customers. Listing tables is a logical first step to identify where customer data might be stored.",
- "plan": "- Use the get_tables command to list all available tables.\n- Identify the table that likely contains customer information.\n- Use get_table_columns to understand the structure of the customer table.\n- Execute an SQL query to count active UK customers.",
- "criticism": "I need to ensure that I'm efficient with the commands I use, aiming to gather the necessary information in as few steps as possible.",
- "speak": "I am now going to list all the available tables to find where customer information is stored."
- },
- "command": {
- "name": "get_tables",
- "args": {}
- }
- }
- {
- "thoughts": {
- "text": "The 'ecommerce.users' table is likely to contain information about the customers. To find the number of active customers from the United Kingdom, I need to understand the structure of this table, specifically looking for columns that might indicate whether a customer is active and their country.",
- "reasoning": "Knowing the structure of the 'users' table will help me determine which columns to use in an SQL query to count active UK customers.",
- "plan": "- Use the get_table_columns command to get the structure of the 'ecommerce.users' table.\n- Identify columns that indicate customer activity and country.\n- Formulate an SQL query to count active UK customers based on the identified columns.",
- "criticism": "I must continue to be efficient with my commands, ensuring that each step brings me closer to the goal without unnecessary actions.",
- "speak": "I will now retrieve the structure of the 'ecommerce.users' table to identify the relevant columns for active customers and their country."
- },
- "command": {
- "name": "get_table_columns",
- "args": {
- "database": "ecommerce",
- "table": "users"
- }
- }
- }
- {
- "thoughts": {
- "text": "With the structure of the 'ecommerce.users' table known, I can see that there are columns for 'country' and 'is_active', which are necessary to identify active customers from the United Kingdom.",
- "reasoning": "The 'country' column will allow me to filter customers by their country, and the 'is_active' column will allow me to filter customers by their activity status. With this information, I can construct an SQL query to count the number of active customers from the UK.",
- "plan": "- Execute an SQL query using the 'execute_sql' command to count the number of active customers from the United Kingdom.\n- The query will count rows where 'country' is 'United Kingdom' and 'is_active' is true (1).",
- "criticism": "I need to ensure that the SQL query is correctly formulated to retrieve the exact information needed without any errors.",
- "speak": "I am now ready to execute an SQL query to count the number of active customers from the United Kingdom."
- },
- "command": {
- "name": "execute_sql",
- "args": {
- "query": "SELECT COUNT(*) FROM ecommerce.users WHERE country = 'United Kingdom' AND is_active = 1"
- }
- }
- }
- {
- "thoughts": {
- "text": "I have successfully executed the SQL query and obtained the number of active customers from the United Kingdom, which is 111,469.",
- "reasoning": "The goal was to find the number of active customers from the UK, and the SQL query has provided this information.",
- "plan": "- Since the goal has been achieved, the next step is to use the 'finish' command to signal that I have completed all my objectives.",
- "criticism": "There is no need for further action or commands as the objective has been met.",
- "speak": "I have found the number of active customers from the United Kingdom and am ready to finish the task."
- },
- "command": {
- "name": "finish",
- "args": {
- "response": "The number of active customers from the United Kingdom is 111,469."
- }
- }
- }
总体而言,AutoGPT 看起来是一种可行的方法。不过,仍然认为目前构建代理最可靠的方法是通过 OpenAI 函数。因为AutoGPT过程是不可控的。
- uri = 'clickhouse+native://localhost/ecommerce'
- db = SQLDatabase.from_uri(uri)
- toolkit = SQLDatabaseToolkit(db=db, llm=OpenAI(temperature=0))
- agent_executor = create_sql_agent(
- llm=ChatOpenAI(temperature=0.1, model = 'gpt-4-1106-preview'),
- toolkit=toolkit,
- verbose=True,
- agent_type=AgentType.OPENAI_FUNCTIONS
- )
- agent_executor.run("How many active customers from the United Kingdom do we have?")
- > Entering new AgentExecutor chain...
- Invoking: `sql_db_list_tables` with ``
- sessions, users
- Invoking: `sql_db_schema` with `users`
- CREATE TABLE users (
- user_id UInt64,
- country String,
- is_active UInt8,
- age UInt64
- ) ENGINE = Log
- /*
- 3 rows from users table:
- user_id country is_active age
- 1000001 United Kingdom 0 70
- 1000002 France 1 87
- 1000003 France 1 88
- */
- Invoking: `sql_db_query` with `SELECT COUNT(*) FROM users WHERE country = 'United Kingdom' AND is_active = 1`
- [(111469,)]We have 111,469 active customers from the United Kingdom.
- > Finished chain.
- 'We have 111,469 active customers from the United Kingdom.'
其实这里langchain中两种方式,第一种构建chain,执行chain.run(question)。第二种是toolkit = SQLDatabaseToolkit(db=db, llm=llm) 然后构建Agent使用toolkit方法。这里是直接使用的第二种。
本文探究如何创建不同类型的代理实现text2sql。我们已经实现了一个由 LLM 支持的代理,可以完全从头开始使用 SQL 数据库。然后,我们利用高级 LangChain 工具通过几个函数调用来实现相同的结果。我们可以添加数据库Agent作为 LLM 支持的分析师的工具。这将是我们的一项技能。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。