当前位置:   article > 正文

fastAPI Tortois-ORM websocket 多个客户端实现_fastapi websocket多客户端并发

fastapi websocket多客户端并发

实现 FastAPI、Tortoise-ORM 和 WebSocket 多个客户端的功能,您需要完成以下步骤:

  1. 安装 FastAPI、Tortoise-ORM 和 WebSocket 库 您可以使用 pip 安装这些库,例如:
    1. pip install fastapi
    2. pip install uvicorn
    3. pip install tortoise-orm
    4. pip install websockets
  2. 实现 Tortoise-ORM 数据库模型 首先,您需要定义数据库模型,例如:
    1. from tortoise.models import Model
    2. from tortoise import fields
    3. class User(Model):
    4. id = fields.UUIDField(pk=True)
    5. name = fields.CharField(max_length=50)
    6. age = fields.IntField()
  3. 实现 FastAPI 路由和 WebSocket 处理程序 您可以在 FastAPI 应用程序中定义路由和 WebSocket 处理程序,例如:
    1. from fastapi import FastAPI, WebSocket
    2. from tortoise.contrib.fastapi import register_tortoise
    3. app = FastAPI()
    4. @app.websocket("/ws/{client_id}")
    5. async def websocket_endpoint(websocket: WebSocket, client_id: str):
    6. await websocket.accept()
    7. while True:
    8. try:
    9. data = await websocket.receive_json()
    10. # 在这里处理客户端发送的数据
    11. # 这里可以用 Tortoise-ORM 查询数据库,然后将结果返回给客户端
    12. except:
    13. break
    14. register_tortoise(
    15. app,
    16. db_url="sqlite://db.sqlite3",
    17. modules={"models": ["app.models"]},
    18. generate_schemas=True,
    19. )
    20. if __name__ == "__main__":
    21. import uvicorn
    22. uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

  4. 启动应用程序并测试 WebSocket 您可以使用浏览器中的 WebSocket 客户端插件测试 WebSocket,例如 Chrome 中的 Simple WebSocket Client 插件。在插件中输入 WebSocket URL ws://localhost:8000/ws/client-1,然后发送 JSON 数据,就可以将数据发送给服务器并接收来自服务器的响应。
  5. 添加多个客户端支持 您可以为每个客户端创建一个新的 WebSocket 连接,并使用不同的客户端 ID 标识它们。然后,在服务器端,您可以使用客户端 ID 区分不同的客户端并将数据发送回正确的客户端。

下面是一个具体实现的示例代码:

  1. from fastapi import FastAPI, WebSocket
  2. from typing import Dict
  3. from tortoise.contrib.fastapi import register_tortoise
  4. from tortoise.transactions import in_transaction
  5. import uuid
  6. app = FastAPI()
  7. clients: Dict[str, WebSocket] = {}
  8. @app.websocket("/ws/{client_id}")
  9. async def websocket_endpoint(websocket: WebSocket, client_id: str):
  10. await websocket.accept()
  11. clients[client_id] = websocket
  12. try:
  13. while True:
  14. data = await websocket.receive_json()
  15. await send_message_to_all_clients(data)
  16. except Exception as e:
  17. print(f"WebSocket error: {e}")
  18. finally:
  19. del clients[client_id]
  20. async def send_message_to_all_clients(data: dict):
  21. async with in_transaction():
  22. client_id = data["client_id"]
  23. message = data["message"]
  24. await save_message_to_database(client_id, message)
  25. for client, ws in clients.items():
  26. if client != client_id:
  27. await ws.send_json({"client_id": client_id, "message": message})
  28. async def save_message_to_database(client_id: str, message: str):
  29. # save the message to database using Tortoise-ORM
  30. ...
  31. register_tortoise(
  32. app,
  33. db_url="sqlite://db.sqlite3",
  34. modules={"models": ["app.models"]},
  35. generate_schemas=True,
  36. )
  37. if __name__ == "__main__":
  38. import uvicorn
  39. uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

在这个示例中,我们添加了 clients 字典来存储所有连接的客户端,并使用客户端 ID 作为字典的键。在客户端连接时,我们将客户端添加到 clients 字典中。在客户端断开连接时,我们将客户端从 clients 字典中删除。每次收到一个客户端的消息时,我们将消息发送给所有连接的客户端,除了当前客户端。我们还使用 Tortoise-ORM 将消息保存到数据库中。

注意,这是一个简单的示例实现,您可以根据实际需求进行修改和完善。

下面是一个简单的封装示例:

  1. from typing import Dict
  2. from fastapi import FastAPI, WebSocket
  3. from tortoise.contrib.fastapi import register_tortoise
  4. from tortoise.transactions import in_transaction
  5. app = FastAPI()
  6. class WebSocketManager:
  7. def __init__(self):
  8. self.clients: Dict[str, WebSocket] = {}
  9. async def connect(self, websocket: WebSocket, client_id: str):
  10. await websocket.accept()
  11. self.clients[client_id] = websocket
  12. async def disconnect(self, client_id: str):
  13. del self.clients[client_id]
  14. async def send_message_to_all(self, data: dict):
  15. async with in_transaction():
  16. client_id = data["client_id"]
  17. message = data["message"]
  18. await self.save_message_to_database(client_id, message)
  19. for client, ws in self.clients.items():
  20. if client != client_id:
  21. await ws.send_json({"client_id": client_id, "message": message})
  22. async def save_message_to_database(self, client_id: str, message: str):
  23. # save the message to database using Tortoise-ORM
  24. ...
  25. manager = WebSocketManager()
  26. @app.websocket("/ws/{client_id}")
  27. async def websocket_endpoint(websocket: WebSocket, client_id: str):
  28. await manager.connect(websocket, client_id)
  29. try:
  30. while True:
  31. data = await websocket.receive_json()
  32. await manager.send_message_to_all(data)
  33. except Exception as e:
  34. print(f"WebSocket error: {e}")
  35. finally:
  36. await manager.disconnect(client_id)
  37. register_tortoise(
  38. app,
  39. db_url="sqlite://db.sqlite3",
  40. modules={"models": ["app.models"]},
  41. generate_schemas=True,
  42. )
  43. if __name__ == "__main__":
  44. import uvicorn
  45. uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

在这个示例中,我们创建了一个 WebSocketManager 类来封装 WebSocket 的管理逻辑。在 WebSocketManager 类中,我们将 clients 字典和一些 WebSocket 相关的方法封装在一起。在 websocket_endpoint 函数中,我们只需要调用 WebSocketManager 的相应方法就可以处理 WebSocket 连接和消息发送的逻辑。

这个示例是一个简单的封装示例,请根据实际需求进行修改和完善。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/892245
推荐阅读
相关标签
  

闽ICP备14008679号