赞
踩
使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。
FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。
总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。
本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回
Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。
https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools
基本这些工具能满足大部分需求,具体使用参见:
stream
:基本的流式传输方式,能逐步给出代理的动作和观察结果。astream
:异步的流式传输,用于异步处理需求的情况。astream_events
:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。
它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。
是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。
增加Google Search以及langchainhub的依赖包
- conda create -n fastapi_test python=3.10
- conda activate fastapi_test
- pip install fastapi websockets uvicorn
- pip install --quiet langchain-core langchain-community langchain-openai
- pip install google-search-results langchainhub
参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)
- # -*- coding: utf-8 -*-
- import asyncio
- import os
- from langchain.agents import create_structured_chat_agent, AgentExecutor
- from langchain_community.utilities.serpapi import SerpAPIWrapper
- from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
- from langchain_core.tools import tool
- from langchain_openai import ChatOpenAI
-
- os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
- os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-
-
- llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
-
-
- @tool
- def search(query:str):
- """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
- serp = SerpAPIWrapper()
- result = serp.run(query)
- print("实时搜索结果:", result)
- return result
-
-
- tools = [search]
-
- template='''
- Respond to the human as helpfully and accurately as possible. You have access to the following tools:
- {tools}
- Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
- Valid "action" values: "Final Answer" or {tool_names}
- Provide only ONE action per $JSON_BLOB, as shown:
- ```
- {{
- "action": $TOOL_NAME,
- "action_input": $INPUT
- }}
- ```
- Follow this format:
- Question: input question to answer
- Thought: consider previous and subsequent steps
- Action:
- ```
- $JSON_BLOB
- ```
- Observation: action result
- ... (repeat Thought/Action/Observation N times)
- Thought: I know what to respond
- Action:
- ```
- {{
- "action": "Final Answer",
- "action_input": "Final response to human"
- }}
- Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
- '''
- system_message_prompt = SystemMessagePromptTemplate.from_template(template)
- human_template='''
- {input}
- {agent_scratchpad}
- (reminder to respond in a JSON blob no matter what)
- '''
- human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
- prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
-
-
- print(prompt)
-
- agent = create_structured_chat_agent(
- llm, tools, prompt
- )
-
- agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
-
-
- async def chat(params):
- events = agent_executor.astream_events(params,version="v2")
- async for event in events:
- type = event['event']
- if 'on_chat_model_stream' == type:
- data = event['data']
- chunk = data['chunk']
- content = chunk.content
- if content and len(content) > 0:
- print(content)
-
-
-
- asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
说明:
流式输出的数据结构为:
- {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
- type: on_chat_model_stream
- {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
在4.1使用Tool&流式输出的代码基础上进行调整
- # -*- coding: utf-8 -*-
- import asyncio
- import os
- from langchain.agents import create_structured_chat_agent, AgentExecutor
- from langchain_community.utilities.serpapi import SerpAPIWrapper
- from langchain_core.tools import tool
- from langchain_openai import ChatOpenAI
-
- os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
- os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-
- from langchain import hub
-
- llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
-
-
- @tool
- def search(query:str):
- """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
- serp = SerpAPIWrapper()
- result = serp.run(query)
- print("实时搜索结果:", result)
- return result
-
-
- tools = [search]
-
- prompt = hub.pull("hwchase17/structured-chat-agent")
-
- print(prompt)
-
- agent = create_structured_chat_agent(
- llm, tools, prompt
- )
-
- agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
-
-
-
- async def chat(params):
- events = agent_executor.astream_events(params,version="v2")
- async for event in events:
- type = event['event']
- if 'on_chat_model_stream' == type:
- data = event['data']
- chunk = data['chunk']
- content = chunk.content
- if content and len(content) > 0:
- print(content)
-
-
- asyncio.run(chat({"input": "广州现在天气如何?"}))
调用结果:
在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整
- import uvicorn
- import os
-
- from typing import Annotated
- from fastapi import (
- Depends,
- FastAPI,
- WebSocket,
- WebSocketException,
- WebSocketDisconnect,
- status,
- )
- from langchain import hub
- from langchain.agents import create_structured_chat_agent, AgentExecutor
- from langchain_community.utilities import SerpAPIWrapper
-
- from langchain_core.tools import tool
- from langchain_openai import ChatOpenAI
-
- os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
- os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
-
-
- class ConnectionManager:
- def __init__(self):
- self.active_connections: list[WebSocket] = []
-
- async def connect(self, websocket: WebSocket):
- await websocket.accept()
- self.active_connections.append(websocket)
-
- def disconnect(self, websocket: WebSocket):
- self.active_connections.remove(websocket)
-
- async def send_personal_message(self, message: str, websocket: WebSocket):
- await websocket.send_text(message)
-
- async def broadcast(self, message: str):
- for connection in self.active_connections:
- await connection.send_text(message)
-
- manager = ConnectionManager()
-
- app = FastAPI()
-
- async def authenticate(
- websocket: WebSocket,
- userid: str,
- secret: str,
- ):
- if userid is None or secret is None:
- raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
-
- print(f'userid: {userid},secret: {secret}')
- if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
- return 'pass'
- else:
- return 'fail'
-
- @tool
- def search(query:str):
- """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
- serp = SerpAPIWrapper()
- result = serp.run(query)
- print("实时搜索结果:", result)
- return result
-
-
- def get_prompt():
- prompt = hub.pull("hwchase17/structured-chat-agent")
-
- return prompt
-
- async def chat(query):
- global llm,tools
- agent = create_structured_chat_agent(
- llm, tools, get_prompt()
- )
-
- agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
-
- events = agent_executor.astream_events({"input": query}, version="v1")
- async for event in events:
- type = event['event']
- if 'on_chat_model_stream' == type:
- data = event['data']
- chunk = data['chunk']
- content = chunk.content
- if content and len(content) > 0:
- print(content)
- yield content
-
-
- @app.websocket("/ws")
- async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
- await manager.connect(websocket)
- try:
- while True:
- text = await websocket.receive_text()
-
- if 'fail' == permission:
- await manager.send_personal_message(
- f"authentication failed", websocket
- )
- else:
- if text is not None and len(text) > 0:
- async for msg in chat(text):
- await manager.send_personal_message(msg, websocket)
-
- except WebSocketDisconnect:
- manager.disconnect(websocket)
- print(f"Client #{userid} left the chat")
- await manager.broadcast(f"Client #{userid} left the chat")
-
- if __name__ == '__main__':
- tools = [search]
- llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
- uvicorn.run(app, host='0.0.0.0',port=7777)
客户端:
- <!DOCTYPE html>
- <html>
- <head>
- <title>Chat</title>
- </head>
- <body>
- <h1>WebSocket Chat</h1>
- <form action="" onsubmit="sendMessage(event)">
- <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
- <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
- <br/>
- <button onclick="connect(event)">Connect</button>
- <hr>
- <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
- <button>Send</button>
- </form>
- <ul id='messages'>
- </ul>
- <script>
- var ws = null;
- function connect(event) {
- var userid = document.getElementById("userid")
- var secret = document.getElementById("secret")
- ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
- ws.onmessage = function(event) {
- var messages = document.getElementById('messages')
- var message = document.createElement('li')
- var content = document.createTextNode(event.data)
- message.appendChild(content)
- messages.appendChild(message)
- };
- event.preventDefault()
- }
- function sendMessage(event) {
- var input = document.getElementById("messageText")
- ws.send(input.value)
- input.value = ''
- event.preventDefault()
- }
- </script>
- </body>
- </html>
调用结果:
用户输入:你好
不需要触发工具调用
模型输出:
用户输入:广州现在天气如何?
需要调用工具
模型输出:
- ```
- Action:
- ```
- {
- "action": "Final Answer",
- "action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
- }
- ```
PS:
1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。
2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。