当前位置:   article > 正文

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

 一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回


二、术语

2.1.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

    是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

    它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

    是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

  1. conda create -n fastapi_test python=3.10
  2. conda activate fastapi_test
  3. pip install fastapi websockets uvicorn
  4. pip install --quiet langchain-core langchain-community langchain-openai
  5. pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import os
  4. from langchain.agents import create_structured_chat_agent, AgentExecutor
  5. from langchain_community.utilities.serpapi import SerpAPIWrapper
  6. from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
  7. from langchain_core.tools import tool
  8. from langchain_openai import ChatOpenAI
  9. os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
  10. os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  11. llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
  12. @tool
  13. def search(query:str):
  14. """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
  15. serp = SerpAPIWrapper()
  16. result = serp.run(query)
  17. print("实时搜索结果:", result)
  18. return result
  19. tools = [search]
  20. template='''
  21. Respond to the human as helpfully and accurately as possible. You have access to the following tools:
  22. {tools}
  23. Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
  24. Valid "action" values: "Final Answer" or {tool_names}
  25. Provide only ONE action per $JSON_BLOB, as shown:
  26. ```
  27. {{
  28. "action": $TOOL_NAME,
  29. "action_input": $INPUT
  30. }}
  31. ```
  32. Follow this format:
  33. Question: input question to answer
  34. Thought: consider previous and subsequent steps
  35. Action:
  36. ```
  37. $JSON_BLOB
  38. ```
  39. Observation: action result
  40. ... (repeat Thought/Action/Observation N times)
  41. Thought: I know what to respond
  42. Action:
  43. ```
  44. {{
  45. "action": "Final Answer",
  46. "action_input": "Final response to human"
  47. }}
  48. Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
  49. '''
  50. system_message_prompt = SystemMessagePromptTemplate.from_template(template)
  51. human_template='''
  52. {input}
  53. {agent_scratchpad}
  54. (reminder to respond in a JSON blob no matter what)
  55. '''
  56. human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
  57. prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
  58. print(prompt)
  59. agent = create_structured_chat_agent(
  60. llm, tools, prompt
  61. )
  62. agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
  63. async def chat(params):
  64. events = agent_executor.astream_events(params,version="v2")
  65. async for event in events:
  66. type = event['event']
  67. if 'on_chat_model_stream' == type:
  68. data = event['data']
  69. chunk = data['chunk']
  70. content = chunk.content
  71. if content and len(content) > 0:
  72. print(content)
  73. asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

说明:

流式输出的数据结构为:

  1. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
  2. type: on_chat_model_stream
  3. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}

4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import os
  4. from langchain.agents import create_structured_chat_agent, AgentExecutor
  5. from langchain_community.utilities.serpapi import SerpAPIWrapper
  6. from langchain_core.tools import tool
  7. from langchain_openai import ChatOpenAI
  8. os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
  9. os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  10. from langchain import hub
  11. llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)
  12. @tool
  13. def search(query:str):
  14. """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
  15. serp = SerpAPIWrapper()
  16. result = serp.run(query)
  17. print("实时搜索结果:", result)
  18. return result
  19. tools = [search]
  20. prompt = hub.pull("hwchase17/structured-chat-agent")
  21. print(prompt)
  22. agent = create_structured_chat_agent(
  23. llm, tools, prompt
  24. )
  25. agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
  26. async def chat(params):
  27. events = agent_executor.astream_events(params,version="v2")
  28. async for event in events:
  29. type = event['event']
  30. if 'on_chat_model_stream' == type:
  31. data = event['data']
  32. chunk = data['chunk']
  33. content = chunk.content
  34. if content and len(content) > 0:
  35. print(content)
  36. asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

4.3. 整合代码

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

  1. import uvicorn
  2. import os
  3. from typing import Annotated
  4. from fastapi import (
  5. Depends,
  6. FastAPI,
  7. WebSocket,
  8. WebSocketException,
  9. WebSocketDisconnect,
  10. status,
  11. )
  12. from langchain import hub
  13. from langchain.agents import create_structured_chat_agent, AgentExecutor
  14. from langchain_community.utilities import SerpAPIWrapper
  15. from langchain_core.tools import tool
  16. from langchain_openai import ChatOpenAI
  17. os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 你的Open AI Key
  18. os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  19. class ConnectionManager:
  20. def __init__(self):
  21. self.active_connections: list[WebSocket] = []
  22. async def connect(self, websocket: WebSocket):
  23. await websocket.accept()
  24. self.active_connections.append(websocket)
  25. def disconnect(self, websocket: WebSocket):
  26. self.active_connections.remove(websocket)
  27. async def send_personal_message(self, message: str, websocket: WebSocket):
  28. await websocket.send_text(message)
  29. async def broadcast(self, message: str):
  30. for connection in self.active_connections:
  31. await connection.send_text(message)
  32. manager = ConnectionManager()
  33. app = FastAPI()
  34. async def authenticate(
  35. websocket: WebSocket,
  36. userid: str,
  37. secret: str,
  38. ):
  39. if userid is None or secret is None:
  40. raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
  41. print(f'userid: {userid},secret: {secret}')
  42. if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
  43. return 'pass'
  44. else:
  45. return 'fail'
  46. @tool
  47. def search(query:str):
  48. """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
  49. serp = SerpAPIWrapper()
  50. result = serp.run(query)
  51. print("实时搜索结果:", result)
  52. return result
  53. def get_prompt():
  54. prompt = hub.pull("hwchase17/structured-chat-agent")
  55. return prompt
  56. async def chat(query):
  57. global llm,tools
  58. agent = create_structured_chat_agent(
  59. llm, tools, get_prompt()
  60. )
  61. agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)
  62. events = agent_executor.astream_events({"input": query}, version="v1")
  63. async for event in events:
  64. type = event['event']
  65. if 'on_chat_model_stream' == type:
  66. data = event['data']
  67. chunk = data['chunk']
  68. content = chunk.content
  69. if content and len(content) > 0:
  70. print(content)
  71. yield content
  72. @app.websocket("/ws")
  73. async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
  74. await manager.connect(websocket)
  75. try:
  76. while True:
  77. text = await websocket.receive_text()
  78. if 'fail' == permission:
  79. await manager.send_personal_message(
  80. f"authentication failed", websocket
  81. )
  82. else:
  83. if text is not None and len(text) > 0:
  84. async for msg in chat(text):
  85. await manager.send_personal_message(msg, websocket)
  86. except WebSocketDisconnect:
  87. manager.disconnect(websocket)
  88. print(f"Client #{userid} left the chat")
  89. await manager.broadcast(f"Client #{userid} left the chat")
  90. if __name__ == '__main__':
  91. tools = [search]
  92. llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
  93. uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

  1. <!DOCTYPE html>
  2. <html>
  3. <head>
  4. <title>Chat</title>
  5. </head>
  6. <body>
  7. <h1>WebSocket Chat</h1>
  8. <form action="" onsubmit="sendMessage(event)">
  9. <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
  10. <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
  11. <br/>
  12. <button onclick="connect(event)">Connect</button>
  13. <hr>
  14. <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
  15. <button>Send</button>
  16. </form>
  17. <ul id='messages'>
  18. </ul>
  19. <script>
  20. var ws = null;
  21. function connect(event) {
  22. var userid = document.getElementById("userid")
  23. var secret = document.getElementById("secret")
  24. ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
  25. ws.onmessage = function(event) {
  26. var messages = document.getElementById('messages')
  27. var message = document.createElement('li')
  28. var content = document.createTextNode(event.data)
  29. message.appendChild(content)
  30. messages.appendChild(message)
  31. };
  32. event.preventDefault()
  33. }
  34. function sendMessage(event) {
  35. var input = document.getElementById("messageText")
  36. ws.send(input.value)
  37. input.value = ''
  38. event.preventDefault()
  39. }
  40. </script>
  41. </body>
  42. </html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:

  1. ```
  2. Action:
  3. ```
  4. {
  5. "action": "Final Answer",
  6. "action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
  7. }
  8. ```

PS:

1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号