书生·浦语大模型实战营之Lagent & AgentLego 智能体应用搭建
Lagent 简介
Lagent 是一个轻量级开源智能体框架,旨在让用户可以高效地构建基于大语言模型的智能体。同时它也提供了一些典型工具以增强大语言模型的能力。
Lagent 目前已经支持了包括 AutoGPT、ReAct 等在内的多个经典智能体范式,也支持了如下工具:
Arxiv 搜索
Bing 地图
Google 学术搜索
Google 搜索
交互式 IPython 解释器
IPython 解释器
Python 解释器
AgentLego 简介
AgentLego 是一个提供了多种开源工具 API 的多模态工具包,旨在像是乐高积木一样,让用户可以快速简便地拓展自定义工具,从而组装出自己的智能体。通过 AgentLego 算法库,不仅可以直接使用多种工具,也可以利用这些工具,在相关智能体框架(如 Lagent,Transformers Agent 等)的帮助下,快速构建可以增强大语言模型能力的智能体。
AgentLego 目前提供了如下工具:
Lagent 是一个智能体框架,而 AgentLego 与大模型智能体并不直接相关,而是作为工具包,在相关智能体的功能支持模块发挥作用。
在创建开发机界面选择镜像为 Cuda12.2-conda,并选择 GPU 为30% A100
创建一个用于存放 Agent 相关文件的目录
mkdir -p /root/agent
配置 conda 环境,可以输入如下指令
studio-conda -t agent -o pytorch-2.1.2
安装 Lagent 和 AgentLego
Lagent 和 AgentLego 都提供了两种安装方法,一种是通过 pip 直接进行安装,另一种则是从源码进行安装。为了方便使用 Lagent 的 Web Demo 以及 AgentLego 的 WebUI,我们选择直接从源码进行安装。 此处附上源码安装的相关帮助文档:
- cd /root/agent
- conda activate agent
- git clone https://gitee.com/internlm/lagent.git
- cd lagent && git checkout 581d9fb && pip install -e . && cd ..
- git clone https://gitee.com/internlm/agentlego.git
- cd agentlego && git checkout 7769e0d && pip install -e . && cd ..
安装其他将要用到的依赖库,如 LMDeploy,可以执行如下命令:
- conda activate agent
- pip install lmdeploy==0.3.0
git clone 的方法 下载 tutorial
- cd /root/agent
- git clone -b camp2 https://gitee.com/internlm/Tutorial.git
在这一部分中,我们将带大家体验 Lagent 的 Web Demo,使用 Lagent 自定义工具,并体验自定义工具的效果。
Lagent Web Demo
使用 LMDeploy 部署
由于 Lagent 的 Web Demo 需要用到 LMDeploy 所启动的 api_server,因此我们首先按照下图指示在 vscode terminal 中执行如下代码使用 LMDeploy 启动一个 api_server
- conda activate agent
- lmdeploy serve api_server /root/share/new_models/Shanghai_AI_Laboratory/internlm2-chat-7b \
- --server-name \
- --model-name internlm2-chat-7b \
- --cache-max-entry-count 0.1
等待 LMDeploy 的 api_server 与 Lagent Web Demo 完全启动后,在本地进行端口映射,将 LMDeploy api_server 的23333端口以及 Lagent Web Demo 的7860端口映射到本地。可以执行:
ssh -CNg -L 7860: -L 23333: root@ssh.intern-ai.org.cn -p 43844
在本地的浏览器页面中打开 http://localhost:7860 以使用 Lagent Web Demo。首先输入模型 IP 为,在输入完成后按下回车键以确认。并选择插件为 ArxivSearch,以让模型获得在 arxiv 上搜索论文的能力。
- 系统提示词:当开启工具以及代码时,根据需求选择合适的工具进行调用
- 数据分析提示词:你现在已经能够在一个有状态的 Jupyter 笔记本环境中运行 Python 代码。当你向 python 发送含有 Python 代码的消息时,它将在该环境中执行。这个工具适用于多种场景,如数据分析或处理(包括数据操作、统计分析、图表绘制),复杂的计算问题(解决数学和物理难题),编程示例(理解编程概念或特性),文本处理和分析(比如文本解析和自然语言处理),机器学习和数据科学(用于展示模型训练和数据可视化),以及文件操作和数据导入(处理CSV、JSON等格式的文件)。
- 插件提示词:
- 你可以使用如下工具:
- {prompt}
- 如果你已经获得足够信息,请直接给出答案. 避免不必要的工具调用! 同时注意你可以使用的工具,不要随意捏造!
输入“请帮我搜索 InternLM2 Technical Report” 以让模型搜索书生·浦语2的技术报告。效果如下图所示,可以看到模型正确输出了 InternLM2 技术报告的相关信息。尽管还输出了其他论文,但这是由 arxiv 搜索 API 的相关行为导致的
- import json
- import logging
- from copy import deepcopy
- from typing import Dict, List, Optional, Union
- from lagent.actions import ActionExecutor
- from lagent.agents.base_agent import BaseAgent
- from lagent.llms import BaseAPIModel, BaseModel
- from lagent.schema import ActionReturn, ActionStatusCode, AgentReturn, AgentStatusCode, ModelStatusCode # noqa: E501
- "This is the subfunction for tool '{tool_name}', you can use this tool. "
- 'The description of this function is: \n{description}')
- META_CN = ('当开启工具以及代码时,根据需求选择合适的工具进行调用')
- INTERPRETER_CN = ('你现在已经能够在一个有状态的 Jupyter 笔记本环境中运行 Python 代码。'
- '当你向 python 发送含有 Python 代码的消息时,它将在该环境中执行。'
- '这个工具适用于多种场景,如数据分析或处理(包括数据操作、统计分析、图表绘制),'
- '复杂的计算问题(解决数学和物理难题),编程示例(理解编程概念或特性),'
- '文本处理和分析(比如文本解析和自然语言处理),'
- '机器学习和数据科学(用于展示模型训练和数据可视化),'
- '以及文件操作和数据导入(处理CSV、JSON等格式的文件)。')
- PLUGIN_CN = ('你可以使用如下工具:'
- '\n{prompt}\n'
- '如果你已经获得足够信息,请直接给出答案. 避免不必要的工具调用! '
- '同时注意你可以使用的工具,不要随意捏造!')
- class Internlm2Protocol:
- def __init__(
- self,
- meta_prompt: str = META_CN,
- interpreter_prompt: str = INTERPRETER_CN,
- plugin_prompt: str = PLUGIN_CN,
- few_shot: Optional[List] = None,
- language: Dict = dict(
- begin='',
- end='',
- belong='assistant',
- ),
- tool: Dict = dict(
- begin='{start_token}{name}\n',
- start_token='<|action_start|>',
- name_map=dict(plugin='<|plugin|>', interpreter='<|interpreter|>'),
- belong='assistant',
- end='<|action_end|>\n',
- ),
- execute: Dict = dict(
- role='execute', begin='', end='', fallback_role='environment'),
- ) -> None:
- self.meta_prompt = meta_prompt
- self.interpreter_prompt = interpreter_prompt
- self.plugin_prompt = plugin_prompt
- self.roles_cfg = dict(tool=tool, language=language)
- self.language = language
- self.execute = execute
- self.tool = tool
- self.few_shot = few_shot
- def format_sub_role(self, messages: List[Dict]) -> List[Dict]:
- def format_interpreter(message):
- if isinstance(message['content'], dict):
- # assert message['content']['name'] == 'IPythonInterpreter'
- return dict(
- role=message['role'],
- name=message['name'],
- content=message['content']['parameters']['command'])
- else:
- return message
- def format_plugin(message):
- if isinstance(message['content'], dict):
- return dict(
- role=message['role'],
- name=message['name'],
- content=json.dumps(message['content']))
- else:
- return message
- new_message = list()
- for message in messages:
- if message['role'] in [
- 'assistant', 'user', 'system', 'environment'
- ]:
- new_message.append(message)
- continue
- role_cfg = self.roles_cfg[message['role']]
- begin = role_cfg['begin']
- if message['role'] == 'tool':
- if message['name'] == 'interpreter':
- message = format_interpreter(message)
- elif message['name'] == 'plugin':
- message = format_plugin(message)
- else:
- raise NotImplementedError
- begin = role_cfg['begin'].format(
- start_token=role_cfg.get('start_token', ''),
- name=role_cfg.get('name_map', {}).get(message['name'], ''))
- new_content = begin + message['content'] + role_cfg['end']
- if role_cfg.get('fallback_role'):
- new_message.append(
- dict(role=role_cfg['fallback_role'], content=new_content))
- elif role_cfg.get('belong'):
- if new_message[-1]['role'] != role_cfg.get('belong'):
- new_message.append(
- dict(role=role_cfg.get('belong'), content=new_content))
- else:
- new_message[-1]['content'] += new_content
- else:
- new_message.append(
- dict(role=message['role'], content=new_content))
- return new_message
- def format(self,
- inner_step: List[Dict],
- plugin_executor: ActionExecutor = None,
- interpreter_executor: ActionExecutor = None,
- **kwargs) -> list:
- formatted = []
- if self.meta_prompt:
- formatted.append(dict(role='system', content=self.meta_prompt))
- if interpreter_executor and self.interpreter_prompt:
- interpreter_info = interpreter_executor.get_actions_info()[0]
- interpreter_prompt = self.interpreter_prompt.format(
- code_prompt=interpreter_info['description'])
- formatted.append(
- dict(
- role='system',
- content=interpreter_prompt,
- name='interpreter'))
- if plugin_executor and plugin_executor.actions and self.plugin_prompt:
- plugin_descriptions = []
- for api_info in plugin_executor.get_actions_info():
- plugin = deepcopy(api_info)
- if isinstance(api_info, dict):
- tool_name = api_info['name'].split('.')[0]
- plugin['description'] = API_PREFIX.format(
- tool_name=tool_name, description=plugin['description'])
- # only keep required parameters
- required_parameters = [
- param for param in plugin['parameters']
- if param['name'] in plugin['required']
- ]
- plugin['parameters'] = required_parameters
- plugin_descriptions.append(plugin)
- plugin_prompt = self.plugin_prompt.format(
- prompt=json.dumps(
- plugin_descriptions, ensure_ascii=False, indent=4))
- formatted.append(
- dict(role='system', content=plugin_prompt, name='plugin'))
- if self.few_shot:
- for few_shot in self.few_shot:
- formatted += self.format_sub_role(few_shot)
- formatted += self.format_sub_role(inner_step)
- return formatted
- def parse(self, message, plugin_executor: ActionExecutor,
- interpreter_executor: ActionExecutor):
- if self.language['begin']:
- message = message.split(self.language['begin'])[-1]
- if self.tool['name_map']['plugin'] in message:
- message, action = message.split(
- f"{self.tool['start_token']}{self.tool['name_map']['plugin']}")
- action = action.split(self.tool['end'].strip())[0]
- return 'plugin', message, action
- if self.tool['name_map']['interpreter'] in message:
- message, code = message.split(
- f"{self.tool['start_token']}"
- f"{self.tool['name_map']['interpreter']}")
- code = code.split(self.tool['end'].strip())[0].strip()
- return 'interpreter', message, dict(
- name=interpreter_executor.action_names()[0],
- parameters=dict(
- command=code)) if interpreter_executor else None
- return None, message.split(self.tool['start_token'])[0], None
- def format_response(self, action_return, name) -> dict:
- if action_return.state == ActionStatusCode.SUCCESS:
- response = action_return.format_result()
- else:
- response = str(action_return.errmsg)
- content = self.execute['begin'] + response + self.execute['end']
- if self.execute.get('fallback_role'):
- return dict(
- role=self.execute['fallback_role'], content=content, name=name)
- elif self.execute.get('belong'):
- return dict(
- role=self.execute['belong'], content=content, name=name)
- return dict(role=self.execute['role'], content=response, name=name)
- class Internlm2Agent(BaseAgent):
- def __init__(self,
- llm: Union[BaseModel, BaseAPIModel],
- plugin_executor: ActionExecutor = None,
- interpreter_executor: ActionExecutor = None,
- protocol=Internlm2Protocol(),
- max_turn: int = 3) -> None:
- self.max_turn = max_turn
- self._interpreter_executor = interpreter_executor
- super().__init__(
- llm=llm, action_executor=plugin_executor, protocol=protocol)
- def chat(self, message: Union[str, Dict], **kwargs) -> AgentReturn:
- if isinstance(message, str):
- message = dict(role='user', content=message)
- if isinstance(message, dict):
- message = [message]
- inner_history = message[:]
- offset = len(inner_history)
- agent_return = AgentReturn()
- for _ in range(self.max_turn):
- # list of dict
- prompt = self._protocol.format(
- inner_step=inner_history,
- plugin_executor=self._action_executor,
- interpreter_executor=self._interpreter_executor,
- )
- response = self._llm.chat(prompt, **kwargs)
- name, language, action = self._protocol.parse(
- message=response,
- plugin_executor=self._action_executor,
- interpreter_executor=self._interpreter_executor,
- )
- if name:
- if name == 'plugin':
- if self._action_executor:
- executor = self._action_executor
- else:
- logging.info(msg='No plugin is instantiated!')
- continue
- try:
- action = json.loads(action)
- except Exception as e:
- logging.info(msg=f'Invaild action {e}')
- continue
- elif name == 'interpreter':
- if self._interpreter_executor:
- executor = self._interpreter_executor
- else:
- logging.info(msg='No interpreter is instantiated!')
- continue
- else:
- logging.info(
- msg=(f"Invalid name '{name}'. Currently only 'plugin' "
- "and 'interpreter' are supported."))
- continue
- action_return: ActionReturn = executor(action['name'],
- action['parameters'])
- action_return.thought = language
- agent_return.actions.append(action_return)
- inner_history.append(dict(role='language', content=language))
- if not name or action_return.type == executor.finish_action.name:
- agent_return.response = language
- agent_return.state = AgentStatusCode.END
- break
- else:
- inner_history.append(
- dict(role='tool', content=action, name=name))
- inner_history.append(
- self._protocol.format_response(action_return, name=name))
- agent_return.inner_steps = inner_history[offset:]
- return agent_return
- def stream_chat(self, message: List[dict], **kwargs) -> AgentReturn:
- if isinstance(message, str):
- message = dict(role='user', content=message)
- if isinstance(message, dict):
- message = [message]
- inner_history = message[:]
- offset = len(inner_history)
- agent_return = AgentReturn()
- last_agent_state = AgentStatusCode.SESSION_READY
- for _ in range(self.max_turn):
- # list of dict
- prompt = self._protocol.format(
- inner_step=inner_history,
- plugin_executor=self._action_executor,
- interpreter_executor=self._interpreter_executor,
- )
- response = ''
- for model_state, res, _ in self._llm.stream_chat(prompt, **kwargs):
- model_state: ModelStatusCode
- response = res
- if model_state.value < 0:
- agent_return.state = getattr(AgentStatusCode,
- model_state.name)
- yield deepcopy(agent_return)
- return
- else:
- name, language, action = self._protocol.parse(
- message=response,
- plugin_executor=self._action_executor,
- interpreter_executor=self._interpreter_executor,
- )
- if name:
- if model_state == ModelStatusCode.END:
- agent_state = last_agent_state + 1
- if name == 'plugin':
- if self._action_executor:
- executor = self._action_executor
- else:
- logging.info(
- msg='No plugin is instantiated!')
- continue
- try:
- action = json.loads(action)
- except Exception as e:
- logging.info(msg=f'Invaild action {e}')
- continue
- elif name == 'interpreter':
- if self._interpreter_executor:
- executor = self._interpreter_executor
- else:
- logging.info(
- msg='No interpreter is instantiated!')
- continue
- agent_return.state = agent_state
- agent_return.response = action
- else:
- agent_state = (
- AgentStatusCode.PLUGIN_START if name
- == 'plugin' else AgentStatusCode.CODING)
- if agent_state != last_agent_state:
- # agent_return.state = agent_state
- agent_return.response = language
- yield deepcopy(agent_return)
- agent_return.state = agent_state
- agent_return.response = action
- else:
- agent_state = AgentStatusCode.STREAM_ING
- agent_return.state = agent_state
- agent_return.response = language
- last_agent_state = agent_state
- yield deepcopy(agent_return)
- if name:
- action_return: ActionReturn = executor(action['name'],
- action['parameters'])
- action_return.thought = language
- agent_return.actions.append(action_return)
- inner_history.append(dict(role='language', content=language))
- if not name:
- agent_return.response = language
- break
- elif action_return.type == executor.finish_action.name:
- try:
- response = action_return.args['text']['response']
- except Exception:
- logging.info(msg='Unable to parse FinishAction.')
- response = ''
- agent_return.response = response
- break
- else:
- inner_history.append(
- dict(role='tool', content=action, name=name))
- inner_history.append(
- self._protocol.format_response(action_return, name=name))
- agent_state += 1
- agent_return.state = agent_state
- yield agent_return
- agent_return.inner_steps = deepcopy(inner_history[offset:])
- agent_return.state = AgentStatusCode.END
- yield agent_return
lagent/actions/arxiv_search.py 代码
- from typing import Optional, Type
- from lagent.actions.base_action import BaseAction, tool_api
- from lagent.actions.parser import BaseParser, JsonParser
- from lagent.schema import ActionReturn, ActionStatusCode
- class ArxivSearch(BaseAction):
- """Search information from Arxiv.org. \
- Useful for when you need to answer questions about Physics, Mathematics, \
- Computer Science, Quantitative Biology, Quantitative Finance, Statistics, \
- Electrical Engineering, and Economics from scientific articles on arxiv.org.
- """
- def __init__(self,
- top_k_results: int = 3,
- max_query_len: int = 300,
- doc_content_chars_max: int = 1500,
- description: Optional[dict] = None,
- parser: Type[BaseParser] = JsonParser,
- enable: bool = True):
- super().__init__(description, parser, enable)
- self.top_k_results = top_k_results
- self.max_query_len = max_query_len
- self.doc_content_chars_max = doc_content_chars_max
- @tool_api(explode_return=True)
- def get_arxiv_article_information(self, query: str) -> dict:
- """Run Arxiv search and get the article meta information.
- Args:
- query (:class:`str`): the content of search query
- Returns:
- :class:`dict`: article information
- * content (str): a list of 3 arxiv search papers
- """
- import arxiv
- try:
- results = arxiv.Search( # type: ignore
- query[:self.max_query_len],
- max_results=self.top_k_results).results()
- except Exception as exc:
- return ActionReturn(
- errmsg=f'Arxiv exception: {exc}',
- state=ActionStatusCode.HTTP_ERROR)
- docs = [
- f'Published: {result.updated.date()}\nTitle: {result.title}\n'
- f'Authors: {", ".join(a.name for a in result.authors)}\n'
- f'Summary: {result.summary[:self.doc_content_chars_max]}'
- for result in results
- ]
- if docs:
- return {'content': '\n\n'.join(docs)}
- return {'content': 'No good Arxiv Result was found'}
用 Lagent 自定义工具
在本节中,我们将基于 Lagent 自定义一个工具。Lagent 中关于工具部分的介绍文档位于 https://lagent.readthedocs.io/zh-cn/latest/tutorials/action.html 。使用 Lagent 自定义工具主要分为以下几步:
继承 BaseAction 类
实现简单工具的 run 方法;或者实现工具包内每个子工具的功能
简单工具的 run 方法可选被 tool_api 装饰;工具包内每个子工具的功能都需要被 tool_api 装饰
下面我们将实现一个调用和风天气 API 的工具以完成实时天气查询的功能。
首先通过 touch /root/agent/lagent/lagent/actions/weather.py 新建工具文件,该文件内容如下
vim /root/agent/lagent/lagent/actions/weather.py
- import json
- import os
- import requests
- from typing import Optional, Type
- from lagent.actions.base_action import BaseAction, tool_api
- from lagent.actions.parser import BaseParser, JsonParser
- from lagent.schema import ActionReturn, ActionStatusCode
- class WeatherQuery(BaseAction):
- """Weather plugin for querying weather information."""
- def __init__(self,
- key: Optional[str] = None,
- description: Optional[dict] = None,
- parser: Type[BaseParser] = JsonParser,
- enable: bool = True) -> None:
- super().__init__(description, parser, enable)
- key = os.environ.get('WEATHER_API_KEY', key)
- if key is None:
- raise ValueError(
- 'Please set Weather API key either in the environment '
- 'as WEATHER_API_KEY or pass it as `key`')
- self.key = key
- self.location_query_url = 'https://geoapi.qweather.com/v2/city/lookup'
- self.weather_query_url = 'https://devapi.qweather.com/v7/weather/now'
- @tool_api
- def run(self, query: str) -> ActionReturn:
- """一个天气查询API。可以根据城市名查询天气信息。
- Args:
- query (:class:`str`): The city name to query.
- """
- tool_return = ActionReturn(type=self.name)
- status_code, response = self._search(query)
- if status_code == -1:
- tool_return.errmsg = response
- tool_return.state = ActionStatusCode.HTTP_ERROR
- elif status_code == 200:
- parsed_res = self._parse_results(response)
- tool_return.result = [dict(type='text', content=str(parsed_res))]
- tool_return.state = ActionStatusCode.SUCCESS
- else:
- tool_return.errmsg = str(status_code)
- tool_return.state = ActionStatusCode.API_ERROR
- return tool_return
- def _parse_results(self, results: dict) -> str:
- """Parse the weather results from QWeather API.
- Args:
- results (dict): The weather content from QWeather API
- in json format.
- Returns:
- str: The parsed weather results.
- """
- now = results['now']
- data = [
- f'数据观测时间: {now["obsTime"]}',
- f'温度: {now["temp"]}°C',
- f'体感温度: {now["feelsLike"]}°C',
- f'天气: {now["text"]}',
- f'风向: {now["windDir"]},角度为 {now["wind360"]}°',
- f'风力等级: {now["windScale"]},风速为 {now["windSpeed"]} km/h',
- f'相对湿度: {now["humidity"]}',
- f'当前小时累计降水量: {now["precip"]} mm',
- f'大气压强: {now["pressure"]} 百帕',
- f'能见度: {now["vis"]} km',
- ]
- return '\n'.join(data)
- def _search(self, query: str):
- # get city_code
- try:
- city_code_response = requests.get(
- self.location_query_url,
- params={'key': self.key, 'location': query}
- )
- except Exception as e:
- return -1, str(e)
- if city_code_response.status_code != 200:
- return city_code_response.status_code, city_code_response.json()
- city_code_response = city_code_response.json()
- if len(city_code_response['location']) == 0:
- return -1, '未查询到城市'
- city_code = city_code_response['location'][0]['id']
- # get weather
- try:
- weather_response = requests.get(
- self.weather_query_url,
- params={'key': self.key, 'location': city_code}
- )
- except Exception as e:
- return -1, str(e)
- return weather_response.status_code, weather_response.json()
为了获得稳定的天气查询服务, 首先要获取 API KEY。打开 开发文档 | 和风天气开发服务
在两个 terminal 中分别启动 LMDeploy 服务和 Tutorial 已经写好的用于这部分的 Web Demo
- conda activate agent
- lmdeploy serve api_server /root/share/new_models/Shanghai_AI_Laboratory/internlm2-chat-7b \
- --server-name \
- --model-name internlm2-chat-7b \
- --cache-max-entry-count 0.1
- export WEATHER_API_KEY=在2.2节获取的API KEY
- # 比如 export WEATHER_API_KEY=1234567890abcdef
- conda activate agent
- cd /root/agent/Tutorial/agent
- streamlit run internlm2_weather_web_demo.py --server.address --server.port 7860
- {
- "name": "WeatherQuery",
- "parameters": {
- "query": "上海"
- }
- }
使用 AgentLego 工具,体验 AgentLego 的 WebUI,以及基于 AgentLego 自定义工具并体验自定义工具的效果。
Tutorial/agent/lagent.md at camp2 · InternLM/Tutorial · GitHub
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。